Wednesday, April 8, 2015

Streaming and Transforming Data with Apache Ignite


In its 1.0 release Apache Ignitetm added much better streaming support with ability to perform various data transformations, as well as query the streamed data using standard SQL queries. Streaming in Ignite is generally used to ingest continuous large volumes of data into Ignite distributed caches (possibly configured with sliding windows). Streamers can also be used to simply preload large amounts of data into caches on startup.

Here is an example of processing a stream of random numbers.

  1. The stream gets partitioned to multiple cluster nodes in such a way that same numbers will always be processed on the same node. 
  2. Upon receiving a number, our StreamTransformer will get the current count for that number and increment it by 1.

try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer("numbers")) {
    // Allow data updates.
    stmr.allowOverwrite(true);

    // Configure data transformation to count random numbers 
    // added to the stream.
    stmr.receiver(StreamTransformer.from((e, arg) -> {
        // Get current count.
        Long val = e.getValue();

        // Increment count by 1.
        e.setValue(val == null ? 1L : val + 1);

        return null;
    }));

    // Stream 10 million of random numbers in the range of 0 to 1000.
    for (int i = 1; i <= 10_000_000; i++) {
        stmr.addData(RAND.nextInt(1000), 1L);

        if (i % 500_000 == 0)
            System.out.println("Number of tuples streamed into Ignite: " + i);
    }
}
As we are streaming the data into the system, we can also query it using standard SQL. In this case, the data type name (in the example below it is "Long") is treated as a table name.

In the query below, we select 10 most popular numbers out of the stream.
// Query top 10 most popular numbers every.
SqlFieldsQuery top10Qry = new SqlFieldsQuery(
    "select _key, _val from Long order by _val desc limit 10");

// Execute query and get the whole result set.
List<List<?>> top10 = stmCache.query(top10Qry).getAll();

1 comment: