This provides a clj friendly wrapper for CompletableFuture and adds a few utility functions to mimic manifold features. Shamelessly stole code/ideas from the awesome manifold library.
->Recur
(->Recur args)
Recur
all
(all xs)
Takes a collection of values, some of which may be futures, and
returns a future that will contain a list of realized values
source
all'
(all' fs)
Like all
buf faster if you know you’re only dealing with futures
args
source
any
(any xs)
Returns one value from a collection of futures
source
any'
(any' fs)
Like any
but faster if you know you’re only dealing with future args
source
cancel!
canceled?
catch
chain
(chain x & fns)
Composes functions starting with x as argument triggering calls to
fns for every step coercing the return values to deferreds if
necessary and returns a deferred with the final result.
source
chain'
(chain' x & fns)
Like chain but assumes fns return raw values instead of potential
futures
source
chain-futures
(chain-futures x & fs)
Like chain but takes a value and functions that will return futures
source
complete!
empty
error!
error-future
(error-future x)
Returns a new CompletableFuture that is already completed
exceptionally with the given exception.
source
error?
ex-unwrap
Takes input exception and return the original exception cause (if
any). This unwraps ExecutionException
and CompletionException
source
finally
fmap
future
(future)
(future f)
(future f executor)
No arg creates an empty/incomplete future, 1 arg creates a future that will get the return value of f as realized value on fork-join common pool, 2 arg creates a future that will be realized on ExecutorService supplied with return value of f as realized value.
The executor that is set at this stage will continue to be used for
subsequent steps (then/chain etc) if another one is not specified at
another level
source
future?
handle
let-flow
(let-flow steps & body)
Macro.
manifold.let-flow
port. It doesn’t do any fancy binding dependency analysis
like manifold does, but it’s good enough for the common use cases, not to
mention drastically simpler to implement.
A version of let
where deferred values that are let-bound or closed over can
be treated as if they are realized values. The body will only be executed once
all of the let-bound values, even ones only used for side effects, have been
computed.
source
loop
(loop bindings & body)
Macro.
A version of Clojure’s loop which allows for asynchronous loops, via
qbits.auspex/recur
. loop
will always return a CompletableFuture
Value, even if the body is synchronous. Note that loop
does
not coerce values to deferreds, actual qbits.auspex/future
s
must be used.
(loop [i 1e6]
(chain (future i)
#(if (zero? %)
%
(recur (dec %)))))
source
one
(one & xs)
Returns one value from a list of futures
source
one'
(one' & fs)
Like one
but faster if you know you’re only dealing with future args
source
realized?
recur
(recur & args)
Like recur, but to be used with qbits.auspex/loop
source
recur?
(recur? x)
success!
success-future
(success-future x)
Returns a new CompletableFuture that is already completed with the
given value.
source
then
timeout!
unwrap
(unwrap f)
Tries to deref a Future, returns a value upon completion or the
original exception that triggered exceptional termination (as
opposed to a wrapped exception)
source
when-complete
wrap
zip
(zip & xs)
Takes a list of values, some of which can be futures and returns a
future that will contains a list of realized values
source
zip'
(zip' & fs)
Like zip but faster if you know you’re only dealing with futures
args
source
cached-executor
(cached-executor)
(cached-executor thread-factory)
Creates a thread pool that creates new threads as needed, but will
reuse previously constructed threads when they are available
source
clojure-future-executor
(clojure-future-executor)
Returns the thread pool used by clojure.core/future.
source
current-thread
(current-thread)
Returns current thread
source
current-thread-executor
(current-thread-executor)
Returns an executor that will run task in calling thread
source
fixed-size-executor
(fixed-size-executor {:keys [num-threads thread-factory], :or {thread-factory (Executors/defaultThreadFactory)}})
Returns a new fixed size executor of size num-threads
.
source
fork-join-executor
(fork-join-executor)
Returns forkJoin commonPool Executor
source
single-executor
(single-executor)
(single-executor thread-factory)
Creates an Executor that uses a single worker thread operating off an
unbounded queue.
source
thread-factory
(thread-factory {:keys [fmt priority daemon]})
work-stealing-executor
(work-stealing-executor)
(work-stealing-executor parallelism)
Creates a thread pool that maintains enough threads to support the
given parallelism level, and may use multiple queues to reduce
contention. Arity 1 will have parallelism = available processors
source
biconsumer
(biconsumer f)
bifunction
(bifunction f)
consumer
(consumer f)
function
(function f)
supplier
(supplier f)
ex-unwrap
(ex-unwrap ex)
Unwraps exceptions if we have a valid ex-cause present
source
wrap
(wrap fut)
Converts CompletableFuture
to manifod.deferred
source
-cancel!
(-cancel! _)
-canceled?
(-canceled? _)
-catch
(-catch _ f)
(-catch _ f pattern)
-complete!
(-complete! _ f executor)
-empty
(-empty x)
-error!
(-error! _ val)
-error?
(-error? _)
-finally
(-finally _ f)
(-finally _ f pattern)
-fmap
(-fmap _ f)
(-fmap _ f executor)
-future?
(-future? x)
-handle
(-handle _ f)
(-handle _ f executor)
-realized?
(-realized? _)
-success!
(-success! _ val)
-then
(-then _ f)
(-then _ f executor)
-timeout!
(-timeout! _ timeout-ms)
(-timeout! _ timeout-ms timeout-val)
-when-complete
(-when-complete _ f)
(-when-complete _ f executor)
-wrap
(-wrap x)
Cancel!
Canceled?
Catch
Complete!
Empty
Error!
Error?
FMap
Finally
Future
Handle
Realized?
Success!
Then
Timeout!
WhenComplete
Wrap