Sunday, April 5, 2015

Apache Ignite Word Count Streaming Example


In this example we will stream text into Apache Ignite and count each individual word. We will also issue periodic SQL queries into the stream to query top 10 most popular words.

The example will work as follows:
  1. We will setup up a cache to hold the words as they come from a stream.
  2. We will setup a 1 second sliding window to keep the words only for the last 1 second.
  3. StreamWords program will stream text data into Ignite.
  4. QueryWords program will query top 10 words out of the stream.

Cache Configuration

We define a CacheConfig class which will provide configuration to be used from both programs, StreamWords and QueryWords. The cache will be a partitioned cache which will store words as values. To guarantee that identical words are cached on the same data node, we use AffinityUuid type for unique cache keys.

Note that in this example we use a sliding window of 1 second for our cache. This means that words will disappear from cache after 1 second since they were first entered into cache.
public class CacheConfig {
  public static CacheConfiguration<String, Long> wordCache() {
    CacheConfiguration<String, Long> cfg = new CacheConfiguration<>("words");

    // Index individual words.
    cfg.setIndexedTypes(AffinityUuid.class, /*word type*/String.class);

    // Sliding window of 1 seconds.
    cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(
      new CreatedExpiryPolicy(new Duration(SECONDS, 1))));

    return cfg;
  }
}

Stream Words

We define a StreamWords class which will be responsible to continuously read words form a local text file ("alice-in-wonderland.txt" in our case) and stream them into Ignite "words" cache.

Example
public class StreamWords {
  public static void main(String[] args) throws Exception {
    // Mark this cluster member as client.
    Ignition.setClientMode(true);

    try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
      // The cache is configured with sliding window holding 1 second of the streaming data.
      IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());

      try (IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())) {
        // Stream words from "alice-in-wonderland" book.
        while (true) {
          InputStream in = StreamWords.class.getResourceAsStream("alice-in-wonderland.txt");

          try (LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
            for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
              for (String word : line.split(" "))
                if (!word.isEmpty())
                  // Stream words into Ignite.
                  // By using AffinityUuid as a key, we ensure that identical
                  // words are processed on the same cluster node.
                  stmr.addData(new AffinityUuid(word), word);
            }
          }
        }
      }
    }
  }
}

Query Words

We define a QueryWords class which will periodically query word counts form the cache.

SQL Query
  1. We use standard SQL to query popular words.
  2. Ignite SQL treats Java classes as SQL tables. Since our words are stored as simple String type, the SQL query below queries String table.
  3. Ignite always stores cache keys and values as "_key" and "_val" fields. In our case, "_val" is the word, so we use this syntax in our SQL query.
Example
public class QueryWords {
  public static void main(String[] args) throws Exception {
    // Mark this cluster member as client.
    Ignition.setClientMode(true);

    try (Ignite ignite = Ignition.start()) {
      IgniteCache<String, Long> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());

      // Select top 10 words.
      SqlFieldsQuery top10Qry = new SqlFieldsQuery(
          "select _val, count(_val) as cnt from String " + 
            "group by _val " + 
            "order by cnt desc " + 
            "limit 10",
          true /*collocated*/
      );

      // Query top 10 popular numbers every 5 seconds.
      while (true) {
        // Execute queries.
        List<List<?>> top10 = stmCache.query(top10Qry).getAll();

        // Print top 10 words.
        ExamplesUtils.printQueryResults(top10);

        Thread.sleep(5000);
      }
    }
  }
}

Starting Server Nodes

In order to run the example, you need to start data nodes. In Ignite, data nodes are called server nodes. You can start as many server nodes as you like, but you should have at least 1 in order to run the example.

Server nodes can be started from command line as follows:
bin/ignite.sh
You can also start server nodes programmatically, like so:
public class ExampleNodeStartup {
    public static void main(String[] args) throws IgniteException {
        Ignition.start();
    }
}
Here is how the output of the QueryWords program looks like on my MacBook Pro laptop (I have started 2 server nodes and one StreamWords program as well)
...
Query results:
(the,2890)
(and,1355)
(to,1298)
(a,1139)
(of,1029)
(said,1002)
(in,912)
(she,820)
(was,766)
(you,711)
Query results:
(the,1679)
(to,830)
(and,810)
(a,680)
(of,629)
(she,491)
(it,357)
(in,330)
(said,315)
(was,274)
...

No comments:

Post a Comment