1. Ever seen a product which has duplicated mirrored APIs for synchronous and asynchronous processing? I never liked such APIs as they introduce extra noise to what otherwise could be considered a clean design. There is really no point to have myMethod() and myMethodAsync() methods while all you are trying to do is to change the mode of method execution from synchronous to asynchronous.

    So how do we approach this problem? How do we make the same API operate synchronously or asynchronously by simply flipping a switch? I believe that Apache Ignite project came up with a very neat and elegant solution to this problem.

    Enhance the Java Futures

    First of all, we need to enhance standard Java futures. Standard java.util.concurrent.Future allows you to cancel a task and wait for its completion synchronously, but it misses the whole point of asynchronous execution, which is to be notified about the completion of the operation asynchronously. Java 8 actually addresses this problem with CompletableFuture abstraction, however, it does a lot more than that and may be an overkill for most of the programming tasks.

    Apache Ignite has an API called IgniteFuture which extends standard java.util.concurrent.Future and adds ability to register asynchronous callbacks and chain callbacks one after another:
    public interface IgniteFuture<V> extends Future<V> {
        ...
    
        /**
         * Registers listener closure to be asynchronously notified whenever future completes.
         *
         * @param lsnr Listener closure to register. If not provided - this method is no-op.
         */
        public void listen(IgniteInClosure<? super IgniteFuture<V>> lsnr);
    
        /**
         * Make a chained future to convert result of this future (when complete) into a new format.
         * It is guaranteed that done callback will be called only ONCE.
         *
         * @param doneCb Done callback that is applied to this future when it finishes to produce chained future result.
         * @return Chained future that finishes after this future completes and done callback is called.
         */
        public <T> IgniteFuture<T> chain(IgniteClosure<? super IgniteFuture<V>, T> doneCb);
        
        ...
    }
    
    Now that we have the truly asynchronous futures, let's see how we can avoid duplicity of synchronous and asynchronous APIs.

    IgniteAsyncSupport

    By default, all API invocations in Apache Ignite are synchronous. From usability standpoint this makes sense, as most of the time we all utilize synchronous APIs and resort to asynchronous ones only when we really have to.

    For whenever we need asynchronous behavior, Apache Ignite has IgniteAsyncSupport interface which is a parent to all the APIs that require both, synchronous and asynchronous mode of operation. In Ignite, such APIs usually have to do with distributed operations and may take longer to comlete, like storing data in distributed caches, or executing a distributed computation.

    The main method here is IgniteAsyncSupport.withAsync() which switches any API into asynchronous mode of operation. Whenever asynchronous mode is enabled, the APIs will always store a future for every previous call on per-thread basis. This way, after having invoked an API in asynchronous mode, you can always get the IgniteFuture for that call and listen for the result asynchronously.

    Here is an example of synchronous and asynchronous computations on the cluster.

    Synchronous Compute:
    // IgniteCompute has synchronous and asynchronous modes.
    IgniteCompute compute = ignite.compute();
    
    // Execute a job synchronously and wait for the result.
    String res = compute.call(() -> "Hello world");
    

    Asynchronous Compute:
    // Enable asynchronous mode (note that the same IgniteCompute API is used).
    IgniteCompute asyncCompute = ignite.compute().withAsync();
    
    // Asynchronously execute a job.
    asyncCompute.call(() -> "Hello world");
    
    // Get the future for the above invocation.
    IgniteFuture<String> fut = asyncCompute.future();
    
    // Asynchronously listen for completion and print out the result.
    fut.listen(f -> System.out.println("Job result: " + f.get()));

    Here is an example of a how asynchronous mode is enabled for distributed caches.

    Synchronous Cache:
    IgniteCache<String, Integer> cache = ignite.jcache("mycache");
    
    // Synchronously store value in cache and get previous value.
    Integer val = cache.getAndPut("1", 1);
    
    Asynchronous Cache:
    // Enable asynchronous mode (note that the same IgniteCache API is used).
    IgniteCache<String, Integer> asyncCache = ignite.jcache("mycache").withAsync();
    
    // Asynchronously store value in cache.
    asyncCache.getAndPut("1", 1);
    
    // Get future for the above invocation.
    IgniteFuture<Integer> fut = asyncCache.future();
    
    // Asynchronously listen for the operation to complete.
    fut.listenAsync(f -> System.out.println("Previous cache value: " + f.get()));
    

    See Ignite documentation for more information about Ignite Asynchronous Mode.



    Apache Ignite is a distributed In-Memory Data Fabric which allows to distribute and cache data in memory, perform distributed computations, streaming, etc... Since most of the supported functionality in Ignite is distributed, having properly implemented asynchronous mode of operation becomes very critical.
    1

    View comments


  2. An easy-to-manage network cluster is a cluster in which all nodes are equal and can be brought up with identical configuration. However, even though all nodes are equal, it often still makes sense to assign application-specific roles to them, like "workers', "clients", or "data-nodes". In Apache Ignite, this concept is called cluster groups.

    Apache Ignite is an In-Memory Data Fabric composed of multiple distributed components with Clustering APIs serving as the main backbone for the rest of the components, including Data Grid, Compute Grid, and Service Grid. I am one of the committers to this project and generally blog quite a bit about it.

    You can create virtual cluster groups in Ignite based on any application-specific custom filter. However, to make things easier, Ignite comes with some predefined filters.


    Select Remote Nodes

    Here is how you can execute a simple closure on all remote nodes. Remote nodes include all cluster members, except for the member who is starting the execution.

    final Ignite ignite = Ignition.ignite();
    
    IgniteCluster cluster = ignite.cluster();
    
    // Get compute instance which will only execute
    // over remote nodes, i.e. not this node.
    IgniteCompute compute = ignite.compute(cluster.forRemotes());
    
    // Broadcast to all remote nodes and print the ID of the node 
    // on which this closure is executing.
    compute.broadcast(() -> System.out.println("Hello Node: " + cluster.localNode().id());
    


    Select Worker Nodes

    You can assign application specific roles to cluster members, like "masters" and "workers", for example. This can be done via user attributes specified on node startup. For example, here is how you can bring up a cluster node with "ROLE" attribute set to "worker":

    IgniteConfiguration cfg = new IgniteConfiguration();
    
    Map<String,String> attrs = Collections.singletonMap("ROLE", "worker");
    
    cfg.setUserAttributes(attrs);
    
    // Start Ignite node.
    Ignite ignite = Ignition.start(cfg);
    

    Then here is how you would execute a closure only over nodes with role "worker":

    IgniteCluster cluster = ignite.cluster();
    
    // Get compute instance which will only execute over "worker" nodes.
    IgniteCompute compute = ignite.compute(cluster.forAttribute("ROLE", "worker"));
    
    // Broadcast to all "worker" nodes and print the ID of the node 
    // on which this closure is executing.
    compute.broadcast(() -> System.out.println("Hello Node: " + cluster.localNode().id());
    


    Custom Cluster Groups

    And finally, you can create custom cluster groups based on any user-defined predicates. Such cluster groups will always only include the nodes that pass the predicate. For example, here is how we would create a group of cluster nodes that have CPU utilization less than 50%:

    // Nodes with less than 50% CPU load.
    ClusterGroup readyNodes = cluster.forPredicate((node) -> node.metrics().getCurrentCpuLoad() < 0.5);
    
    // Broadcast to all nodes with CPU load less than 50% and
    // print the ID of the node on which this closure is executing.
    compute.broadcast(() -> System.out.println("Hello Node: " + cluster.localNode().id());
    

    For more on cluster groups, visit Ignite Cluster Groups documentation.
    5

    View comments

About me
About me
- Antoine de Saint-Exupery -
- Antoine de Saint-Exupery -
"A designer knows he has achieved perfection not when there is nothing left to add, but when there is nothing left to take away."
Blog Archive
Blogs I frequent
Loading
Dynamic Views theme. Powered by Blogger.