Sunday, April 19, 2015

Parallel Streaming Transformations

Prior blogs have covered the basics of solrj.io. In this blog we begin to learn how to harness the parallel computing capabilities of solrj.io.

This blog introduces the ParallelStream which sends a TupleStream to worker nodes where it can be processed in parallel. Both streaming transformations and streaming aggregations can be parallelized using the ParallelStream. This blog focuses on an example of a parallel streaming transformation. Future blogs will provide examples of parallel streaming aggregation.

Worker Collections

Parallel streaming operations are handled by Solr nodes within a worker collection. Worker collections are SolrCloud collections that have been configured to handle streaming operations. The only requirement for worker collections is that the Solr nodes in the collection must be configured with a StreamHandler to handle the streaming requests.

Worker collections can hold an index with data or they can be empty collections that only exist to handle streaming operations.

Stream Partitioning

Each worker node is shuffled 1/Nth of the search results. So if there are 7 worker nodes specified each node will be shuffled 1/7th of the search results. The partitioning is setup under the covers by the StreamHandler on the worker nodes.

Search results are hash partitioned by keys. This ensures that all results with the same key(s) are sent to the same worker node. When coupled with the ability to sort by keys, hash partitioning provides shuffling functionality that allows many streaming operations to be parallelized.

ParallelStream

A TupleStream decorator called the ParallelStream wraps a TupleStream and sends it to N worker nodes to be operated on in parallel.

The ParallelStream serializes the byte code of the underlying TupleStream and sends it to the workers before the TupleStream is opened. The workers deserialize the TupleStream, open the stream, read the Tuples and stream them back to the ParallelStream.

The TupleStream sent to the workers can be programmed to stream only aggregated results or the top N results back to the ParallelStream.

UniqueStream

The example below also introduces a TupleStream decorator called the UniqueStream. The UniqueStream emits a unique stream of Tuples. Tuples are de-duplicated based on a Comparator.

The example will show how the UniqueStream can be used with the ParallelStream to perform the unique operation in parallel.

Simple Parallel Streaming Transformation Example
org.apache.solr.client.solrj.io.*;
import java.util.*;

public class StreamingClient {

   public static void main(String args[]) throws IOException {
      String zkHost = args[0];
      String collection = args[1];
      String workerCollection = args[2];

      Map props = new HashMap();
      props.put("q", "*:*");
      props.put("qt", "/export");
      props.put("sort", "fieldA asc");
      props.put("fl", "fieldA");

      //Set the partition keys
      props.put("partitionKeys", "fieldA");
      
      CloudSolrStream streamC = new CloudSolrStream(zkHost, 
                                                    collection, 
                                                    props);
      Comparator comp = new AscFieldComp("fieldA");
      UniqueStream streamU = new UniqueStream(streamC, comp);
       
      ParallelStream streamP = new ParallelStream(zkHost,
                                                  workerCollection,
                                                  streamU,
                                                  7,
                                                  comp);
                             
      try {
       
        streamP.open();
        while(true) {
          
          Tuple tuple = streamP.read();
          if(tuple.EOF) {
             break;
          }

          String uniqueValue = tuple.getString("fieldA");
          //Print all the unique values
          System.out.println(uniqueValue);

      } finally {
        streamP.close();
      }
   }
}
The example above does the following things:
  1. Creates a CloudSolrStream that connects to a SolrCloud collection.
  2. In the query parameters the sort is set to fieldA asc. This will return the Tuples in ascending order based on fieldA.
  3. The query parameters also set the partitionKeys to fieldA. This will hash partition the search results on fieldA.
  4. The CloudSolrStream is then decorated with a UniqueStream. Note the Comparator which is used by the UniqueStream. In this case the AscFieldComp will cause the UniqueStream to emit unique Tuples based on the value in fieldA. The sorting and partitioning of search results on fieldA allows the UniqueStreams operation to be run in parallel on the worker nodes.
  5. The UniqueStream is decorated with a ParallelStream. Note that the ParallelStream is constructed with the following parameters:
    • zkHost
    • The name of the workerCollection
    • The TupleStream that is being sent to the workers
    • The number of workers
    • A Comparator to order the results coming back from the workers
  6. Then the ParalleStream is opened, iterated and closed.
  7. Under the covers the ParallelStream serializes the UniqueStream and sends it to 7 worker nodes. The workers open the UniqueStream, read the unique Tuples and stream them back to the ParallelStream.

Streaming Aggregation

The last several blogs dealt primarily with streaming transformations. There's still a lot more to talk about with streaming transformation but it's time to start exploring streaming aggregation.

Streaming aggregation deals with computing analytics from TupleStreams. Solrj.io is designed to support two models of streaming aggregation:
  • Metrics: Metric aggregations gather analytics on TupleStreams as Tuples are read without transforming the stream. When all the Tuples have been read from the stream the aggregates are placed on the final EOF Tuple. This technique is likely the preferred approach for aggregations of low to moderate cardinality.  
  • Roll-ups: Roll-up aggregations transform the TupleStream by rolling up aggregates and emitting a stream of Tuples that contain the aggregates. This is likely the preferred approach for handling aggregation of high cardinality fields and time series aggregation
This blog deals specifically with Metric aggregation. Future blogs will demonstrate Roll-up aggregations.

Custom TupleStreams

The initial solrj.io release doesn't come with TupleStream implementations that perform streaming aggregation. Future releases will almost certainly contain aggregation support, but for now we'll need to add some custom TupleStreams to demonstrate in-line aggregation.

You'll see that developing custom TupleStreams is extremely easy, and that's partially why they were left out of the initial release. Building specific aggregations is very easy. The hard part is designing a general purpose aggregation library that handles most of the general use cases.

The EOF Tuple

Metric aggregations were designed to work in concert with the EOF Tuple. You'll recall that the EOF Tuple marks the end of the stream. The EOF Tuple does not contain data from the stream, but it is designed to hold Metric aggregations.

Metric Aggregation

Metric aggregation is accomplished by decorating a TupleStream with an aggregating TupleStream that gathers metrics as the Tuples are read. When the aggregating TupleStream reads the EOF Tuple, it places it's aggregates onto the EOF Tuple. The EOF Tuple will then contain all the in-line aggregates.

Simple Example

The example below will demonstrate a very simple case of in-line aggregation. This code is deliberately kept short and simplistic to demonstrate the mechanics of in-line aggregation. Future blogs will show how to do more interesting things like combining streaming transformations with streaming aggregation, parallel aggregation and using richer aggregate data structures.

The Code Sample
import org.apache.solr.client.solrj.io.*;
import java.util.*;

public class StreamingClient {

   public static void main(String args[]) throws IOException {
      String zkHost = args[0];
      String collection = args[1];

      Map props = new HashMap();
      props.put("q", "*:*");
      props.put("qt", "/export");
      props.put("sort", "fieldA asc");
      props.put("fl", "fieldA,fieldB,fieldC");
      
      CloudSolrStream cloudstream = new CloudSolrStream(zkHost, 
                                                    collection, 
                                                    props);

      //Wrap the CloudSolrStream in a CountStream
      //The CountStream implementation is below

      CountStream countStream = new CountStream(cloudStream);
      
      try {
       
        countStream.open();
        while(true) {
          
          Tuple tuple = countStream.read();
          if(tuple.EOF) {
             long count = tuple.getLong("count");
             System.out.println("Tuple count:"+count);
             break;
          }

          String fieldA = tuple.getString("fieldA");
          String fieldB = tuple.getString("fieldB");
          String fieldC = tuple.getString("fieldC");
          System.out.println(fieldA + ", " + fieldB + ", " + fieldC);
        }
      
      } finally {
       countStream.close();
      }
   }
}

public class CountStream implements TupleStream {

  private TupleStream source;
  private int count;

  public CountStream(TupleStream source) {
     this.source = source;
  }

  public Tuple read() throws IOException {
     Tuple tuple = source.read();
   
     if(tuple.EOF) {
       tuple.put("count", count);
       return tuple;
     }

     ++count;
     return tuple;
  }

  public List children() {
    List children = new ArrayList();
    children.add(source);
    return children;
  }

  public void setStreamContext(StreamContext context) {
    source.setStreamContext(context);
  }

  public void open() throws IOException {
    source.open();
  }

  public void close() throws IOException {
    source.close();
  }
}

The code above is doing the following things:

  • Creating a CloudSolrStream
  • Wrapping the CloudSolrStream with a CountStream
  • Opening, reading and closing the CountStream.
  • The CountStream class implements the TupleStream interface. The read() method simply reads a Tuple from the TupleStream it has wrapped and increments a counter.
  • When the EOF Tuple is encountered it places the count onto the EOF Tuple and returns it.
  • The StreamingClient reads the Tuples from the CountStream and prints the field values from each Tuple. The CountStream does not change the Tuples that are emitted from the CloudSolrStream that it has wrapped. When the StreamingClient encounters the EOF Tuple it reads the count field, which was placed there by the CountStream.

Friday, April 17, 2015

Computing the Complement of Two TupleStreams

This is the third blog in the series about Solrj.io. In the last blog, the concept of TupleStream Decorators was introduced and a simple example was shown using the ReducerStream. In this blog we'll introduce a stream Decorator called the MergeStream and see how it can be used with the ReducerStream to calculate the Complement between two streams.

The MergeStream

The MergeStream unions two TuplesStreams ordering the Tuples based on a Comparator. When used in combination with the ReducerStream this allows Tuples from different streams to be operated on as one unit. This provides the building block for a number of relational operations including intersections, complements and joins.

A Simple Example: Computing the Complement

To find the complement of two sets, A and B, you need to find all the Tuples in set B that are NOT in set A. The example below will show how easy it is to compute the complement of streams of any size using the MergeStream and ReducerStream.

import org.apache.solr.client.solrj.io.*;
import java.util.*;

public class StreamingClient {

   public static void main(String args[]) throws IOException {
      String zkHost = args[0];
      String collectionA = args[1];
      String collectionB = args[2];

      //Query properties for StreamA

      Map propsA = new HashMap();
      propsA.put("q", "*:*");
      propsA.put("qt", "/export");
      propsA.put("sort", "fieldA asc");
      propsA.put("fl", "fieldA");
      
      //Construct StreamA

      CloudSolrStream streamA = new CloudSolrStream(zkHost, 
                                                    collectionA, 
                                                    propsA);

      //Include the Collection name with each tuple

      streamA.setTrace(true); 

      //Query properties for StreamB

      Map propsB = new HashMap();
      propsB.put("q", "*:*");
      propsB.put("qt", "/export");
      propsB.put("sort", "fieldA asc");
      propsB.put("fl", "fieldA");
      
      //Construct StreamB

      CloudSolrStream streamB = new CloudSolrStream(zkHost, 
                                                    collectionB, 
                                                    propsB);

      //Include the Collection name with each tuple
 
      streamB.setTrace(true);

      //Union streamA and streamB ordering by fieldA

      Comparator comp = new AscFieldComp("fieldA");  
      MergeStream streamM = MergeStream(streamA, streamB, comp);

      //Wrap a ReducerStream around the MergeStream

      ReducerStream streamR = new ReducerStream(streamM, comp);

      try {
       
        streamR.open();
        while(true) {
          
          Tuple tuple = streamR.read();
          if(tuple.EOF) {
             break;
          }

          List<Map> maps = tuple.getMaps();
          
          boolean complement = true;
          for(Map map : maps) {

             //Trace is turned on
             //so the "_COLLECTION_" field will be set.

             String col = (String)map.get("_COLLECTION_");

             //Check to see if there is a Tuple
             //from collectionA

             if(col.equals(collectionA)) {
                complement = false;
                break;
             }             
          }

          if(complement) {
            for(Map map : maps) {
               String fieldA = (String)map.get("fieldA");
               
               //print the complement
               System.out.println("Complement:"+ fieldA);
            }
          }
        }
      } finally {
          streamR.close();
      }
   }
}
The example above does the following things:
  1. Creates two CloudSolrStreams that connect to two different collections.
  2. Notice that the CloudSolrStream.setTrace() is called so that the Tuples can be traced back to their collection.
  3. The query parameters for both streams sort on fieldA asc. So both streams will return the Tuples in ascending order based on the fieldA.
  4. The two CloudSolrStreams are wrapped with a MergeStream. Note the Comparator, which also orders by fieldA ascending. This will union the two streams ordering Tuples by fieldA.
  5. The MergeStream is then wrapped with a ReducerStream. Note again that the Comparator is on fieldA so it will group Tuples that have the same value based on fieldA.
  6. The Tuples that have been grouped together can be retrieved by the Tuple.getMaps() method. The Tuple group may contain Tuples from streamA and streamB
  7. To compute the Complement all we need to do is look for Tuple groups that don't have a Tuple from streamA.
  8. Notice the "_COLLECTION_" field which is present because CloudSolrStream.setTrace() was set to true.

Monday, April 13, 2015

TupleSteam Decorators

This is the second blog in the series about Solrj.io. The previous blog covered how to use the basic TupleStream implementations to open, read and close a TupleStream from a SolrCloud collection.

This blog will describe how to use Decorators to perform operations on TupleStreams. There are two types of operations that are typically performed on TupleStreams.

  • Streaming Transformation: These types of operations transform the underlying stream(s). Examples of streaming transformations include: unique, group by, rollup, union, intersect, complement, join etc...) 
  • Streaming Aggregation: These types of operations gather metrics and build aggregations on the underlying streams. These types of operations include: sum, count, average, min, max etc...)

This blog will focus on Streaming Transformation. Followup blogs with cover Streaming Aggregation.

Streaming Transformation

Solrj.io comes with a core set of TupleStream Decorators that can transform underlying TupleStreams. These TupleStream Decorators wrap one or more TupleStreams and perform operations on the Tuples while reading from the underlying streams.

TupleStream Decorators implement the TupleStream interface, so they too can be wrapped by other TupleStream Decorators to build up complex operations.

Sorted Streams

Many of the TupleStream Decorators rely on the sort order of the underlying TupleStreams to perform memory efficient streaming transformations on very large streams.

For example the UniqueStream emits a stream of unique tuples based on a Comparator. The underlying TupleStream must be sorted by the same fields as this Comparator. This allows the UniqueStream to iterate very large TupleStreams and de-duplicate the Tuples using very little memory.

The ReducerStream

The Solrj.io package comes with a TupleStream Decorator called the ReducerStream. The ReducerStream can be thought of as a swiss-army knife for streaming transformations. It can be used on it's own or as a building block for performing higher level streaming transformations such as Joins.

The ReducerStream Iterates over a TupleStream and buffers Tuples that are equal based on a Comparator.

This allows tuples to be grouped by common field(s). The read() method emits one Tuple per group. The fields of the emitted Tuple reflect the first tuple encountered in the group.

The Tuple.getMaps() method returns all the Tuples in the group. This method returns a list of maps (including the group head), which hold the data for each Tuple in the group.


A Simple Example

The example below shows how the ReducerStream can be used to aggregate information about sessions. Session aggregation is a fairly typical use case for Hadoop.

The reason session aggregation is often performed in Hadoop is that it requires the ability to do large scale distributed grouping. This is because sessions typically consist of one or more log records that need to be processed as a unit. These records could be stored on any server in the cluster. Map/Reduce provides a method for doing this type of large scale distributed grouping.

The ReducerStream example shows how this type of operation can be done using Solrj.io.

A couple of notes before jumping into the example:

  • The example below collects all of the sessions in one place. Future blogs will show how to use SolrCloud Worker Collections to perform these types of operations in parallel.
  • The example below shows an operation which is similar in nature to Map/Reduce. It's important to understand that the ReducerStream is just one of many possible Streaming Transformations that can be performed using Solrj.io. Solrj.io is not Map/Reduce, it's a generic API for performing streaming operations.

The Code
import org.apache.solr.client.solrj.io.*;
import java.util.*;

public class StreamingClient {

   public static void main(String args[]) throws IOException {
      String zkHost = args[0];
      String collection = args[1];

      Map props = new HashMap();
      props.put("q", "*:*");
      props.put("qt", "/export");
      props.put("sort", "sessionID asc");
      props.put("fl", "sessionID,sku,price");
      
      CloudSolrStream cstream = new CloudSolrStream(zkHost, 
                                                    collection, 
                                                    props);
      Comparator comp = new AscFieldComp("sessionID");
      ReducerStream rstream = new ReducerStream(cstream, comp);

      try {
       
        rstream.open();
        long sessionCount = 0;
        while(true) {
          
          Tuple sessionTuple = rstream.read();
          if(sessionTuple.EOF) {
             break;
          }

          ++sessionCount;
          String sessionID = sessionTuple.getString("sessionID");

          List<Map> maps = sessionTuple.getMaps();
          
          double sessionPrice = 0.0;
          for(Map map : maps) {
             String recordSessionID = (String) map.get("sessionID")
             assert(sessionID.equals(recordSessionID);
             String sku = (String)map.get("sku");
             double price = (double)map.get("price); 
             sessionPrice += price;
             System.out.println(sessionID+" : "+sessionPrice);
         }

         System.out.println("Session Count:"+sessionCount);

      } finally {
       rstream.close();
      }
   }
}
The example above does the following things:
  1. Creates a CloudSolrStream that connects to a SolrCloud collection.
  2. In the query parameters the sort is set to sessionID asc. This will return the Tuples in ascending order based on the sesssionID field.
  3. Decorates the CloudSolrStream with a ReducerStream. Note the Comparator, which is used by the ReducerStream to group the Tuples. In this case the AscFieldComp will cause the ReducerStream to group the Tuples based on the "sessionID" field.
  4. Then the ReducerStream is opened, iterated and closed.
  5. During the iteration the read() method emits one Tuple for each sessionID. The Tuple.getMaps() method is used to access all the Tuples in the session.

Tuesday, April 7, 2015

The Streaming API (Solrj.io) : The Basics

This is the first blog in a series about the new Streaming API for SolrCloud, otherwise known as Solrj.io.

What is Solrj.io?

Solrj.io is a new Solrj package that provides the programming framework for SolrCloud's new parallel computing framework.

Solrj.io is a Java API located in the org.apache.solr.client.solrj.io package.

The Base Abstractions

Solrlj.io has two important base abstractions:
Tuple: A Tuple is simply a thin wrapper around a Map of key/value pairs. In it's most basic form a Tuple represents a single record from a search result set.

TupleStream: The TupleStream abstracts a search result set as a stream of Tuples. The TupleStream provides a simple interface for opening, reading and closing streams of Tuples.
The Base Implementations

Solrj.io has two base TupleStream implementations:
SolrStream: Abstracts a search result set from a single Solr instance as a stream of Tuples. 
CloudSolrStream: Abstracts a search result set from a SolrCloud collection as a stream of Tuples. 
CloudSolrStream is a SolrCloud smart client. You provide CloudSolrStream with a zkHost and a collection name and it will automatically pick a replica from each shard to perform the query.

CloudSolrStream queries each shard and performs a streaming merge of the results based on an internal Comparator. This streaming merge allows CloudSolrStream to merge very large result sets from the shards with very little memory.
A Simple Example
import org.apache.solr.client.solrj.io.*;
import java.util.*;

public class StreamingClient {

   public static void main(String args[]) throws IOException {
      String zkHost = args[0];
      String collection = args[1];

      Map props = new HashMap();
      props.put("q", "*:*");
      props.put("qt", "/export");
      props.put("sort", "fieldA asc");
      props.put("fl", "fieldA,fieldB,fieldC");
      
      CloudSolrStream cstream = new CloudSolrStream(zkHost, 
                                                    collection, 
                                                    props);
      try {
       
        cstream.open();
        while(true) {
          
          Tuple tuple = cstream.read();
          if(tuple.EOF) {
             break;
          }

          String fieldA = tuple.getString("fieldA");
          String fieldB = tuple.getString("fieldB");
          String fieldC = tuple.getString("fieldC");
          System.out.println(fieldA + ", " + fieldB + ", " + fieldC);
        }
      
      } finally {
       cstream.close();
      }
   }
}

The example above does the following things:
  1. Creates a map with query parameters. Notice the "qt" parameter is set to "/export". This will cause the search to be handled by the /export handler, which is designed to sort and export entire result sets.
  2. Constructs a CloudSolrStream passing it the zkHost, collection and query parameters.
  3. Opens the stream and reads the Tuples until a Tuple with the EOF property set to true is encountered. This EOF Tuple signifies the end of the stream.
  4. For each Tuple fieldA, fieldB and fieldC are read. These fields are present because they are specified in the field list (fl) query parameter.
  5. The Tuples will be returned in ascending order of fieldA. This is specified by the sort parameter.
Streamed Results 

It's important to understand that the Tuples in the example are streamed. This means that the example can handle result sets with millions of documents without running into memory issues.

TupleStream Decorators 

The code sample shows a simple iteration of Tuples. What if we want to do something more exciting with the stream of Tuples?

TupleStream Decorators can be used to perform operations on TupleStreams. A TupleStream Decorator wraps one or more TupleStreams and performs operations on the Tuples as they are read. 

In the next blog we'll see how TupleStream Decorators can be used to both transform TupleStreams and gather metrics on TupleStreams.


Solr temporal graph queries for event correlation, root cause analysis and temporal anomaly detection

Temporal graph queries will be available in the 8.9 release of Apache Solr. Temporal graph queries are designed for key log analytics use ...