Ever seen a product which has duplicated mirrored APIs for synchronous and asynchronous processing? I never liked such APIs as they introduce extra noise to what otherwise could be considered a clean design. There is really no point to have
myMethod() and
myMethodAsync() methods while all you are trying to do is to change the mode of method execution from synchronous to asynchronous.
So how do we approach this problem? How do we make the same API operate synchronously or asynchronously by simply flipping a switch? I believe that
Apache Ignite project came up with a very neat and elegant solution to this problem.
Enhance the Java Futures
First of all, we need to enhance standard Java futures. Standard
java.util.concurrent.Future allows you to cancel a task and wait for its completion synchronously, but it misses the whole point of asynchronous execution, which is to be notified about the completion of the operation asynchronously. Java 8 actually addresses this problem with
CompletableFuture abstraction, however, it does a lot more than that and may be an overkill for most of the programming tasks.
Apache Ignite has an API called
IgniteFuture which extends standard
java.util.concurrent.Future and adds ability to register asynchronous callbacks and chain callbacks one after another:
public interface IgniteFuture<V> extends Future<V> {
...
/**
* Registers listener closure to be asynchronously notified whenever future completes.
*
* @param lsnr Listener closure to register. If not provided - this method is no-op.
*/
public void listen(IgniteInClosure<? super IgniteFuture<V>> lsnr);
/**
* Make a chained future to convert result of this future (when complete) into a new format.
* It is guaranteed that done callback will be called only ONCE.
*
* @param doneCb Done callback that is applied to this future when it finishes to produce chained future result.
* @return Chained future that finishes after this future completes and done callback is called.
*/
public <T> IgniteFuture<T> chain(IgniteClosure<? super IgniteFuture<V>, T> doneCb);
...
}
Now that we have the truly asynchronous futures, let's see how we can avoid duplicity of synchronous and asynchronous APIs.
IgniteAsyncSupport
By default, all API invocations in Apache Ignite are synchronous. From usability standpoint this makes sense, as most of the time we all utilize synchronous APIs and resort to asynchronous ones only when we really have to.
For whenever we need asynchronous behavior, Apache Ignite has
IgniteAsyncSupport interface which is a parent to all the APIs that require both, synchronous and asynchronous mode of operation. In Ignite, such APIs usually have to do with distributed operations and may take longer to comlete, like storing data in distributed caches, or executing a distributed computation.
The main method here is
IgniteAsyncSupport.withAsync() which switches any API into asynchronous mode of operation. Whenever asynchronous mode is enabled, the APIs will always store a future for every previous call on per-thread basis. This way, after having invoked an API in asynchronous mode, you can always get the
IgniteFuture for that call and listen for the result asynchronously.
Here is an example of synchronous and asynchronous computations on the cluster.
Synchronous Compute:
// IgniteCompute has synchronous and asynchronous modes.
IgniteCompute compute = ignite.compute();
// Execute a job synchronously and wait for the result.
String res = compute.call(() -> "Hello world");
Asynchronous Compute:
// Enable asynchronous mode (note that the same IgniteCompute API is used).
IgniteCompute asyncCompute = ignite.compute().withAsync();
// Asynchronously execute a job.
asyncCompute.call(() -> "Hello world");
// Get the future for the above invocation.
IgniteFuture<String> fut = asyncCompute.future();
// Asynchronously listen for completion and print out the result.
fut.listen(f -> System.out.println("Job result: " + f.get()));
Here is an example of a how asynchronous mode is enabled for distributed caches.
Synchronous Cache:
IgniteCache<String, Integer> cache = ignite.jcache("mycache");
// Synchronously store value in cache and get previous value.
Integer val = cache.getAndPut("1", 1);
Asynchronous Cache:
// Enable asynchronous mode (note that the same IgniteCache API is used).
IgniteCache<String, Integer> asyncCache = ignite.jcache("mycache").withAsync();
// Asynchronously store value in cache.
asyncCache.getAndPut("1", 1);
// Get future for the above invocation.
IgniteFuture<Integer> fut = asyncCache.future();
// Asynchronously listen for the operation to complete.
fut.listenAsync(f -> System.out.println("Previous cache value: " + f.get()));
See Ignite documentation for more information about
Ignite Asynchronous Mode.
Apache Ignite is a distributed In-Memory Data Fabric which allows to distribute and cache data in memory, perform distributed computations, streaming, etc... Since most of the supported functionality in Ignite is distributed, having properly implemented asynchronous mode of operation becomes very critical.
Various data encoding techniques
ReplyDeletec++ programming examples