Monday, October 31, 2016

Solr 6.3: Batch jobs, Parallel ETL and Streaming Text Transformation

Solr 6.3 is on it's way and along with it comes a new execution paradigm for Solr: Parallel batch. Solr's new batch capabilities open up a new world of use cases for Solr. This blog will cover the basics of how the parallel batch framework operates and describe how it can be used to perform parallel ETL and parallel text transformations.

Not built on MapReduce


Solr's Streaming Expressions have had MapReduce capabilities for quite a while now. But Solr's MapReduce implementation is designed to support interactive queries over large data sets and to power the Parallel SQL interface when run in MapReduce mode.

And notably Solr's MapReduce implementation does not support streaming of text fields, which makes it unsuitable for performing ETL and text transformations in Solr.

Parallel batch is built on message queues


Solr also has massaging capabilities that allow an entire SolrCloud collection to be treated like a message queue, similar in nature to Apache Kafka. While Apache Kafka is more scalable, Solr's messaging queue is more flexible in that it allows you to subscribe to a query.

The Streaming Expression that allows you to subscribe to a query is the topic expression. Here is the basic syntax:

topic(checkpointCollection,
          dataCollection,
          q="*:*",
          fl="from, to, body",
          id="myTopic",
          rows="300",
          initialCheckpoint="0")

Here is an explanation of the parameters:

  1. checkpointCollection: The topic function tracks the checkpoints for a specific topic id. It stores the checkpoints in this collection.
  2. dataCollection: This is the collection that the topic results are pulled from.
  3. q: The query to use to pull records for this topic.
  4. id: The unique id of the topic. A different set of checkpoints will be maintained for each unique topic id.
  5. rows: The number of rows to fetch from each shard, each time the topic function is called.
  6. initialCheckpoint: Where in the queue to start fetching results from. Setting to 0 will cause the topic to match all the records that match the topic query in the collection. Not setting the initialCheckpoint will cause the topic to begin fetching records that have been added after the topic has been initiated. 
When the topic function is sent to the /stream handler it will retrieve a batch of rows from the topic and update the checkpoints for the topic. The next time it's called it will retrieve the next batch of rows.

The topic function has no restriction on the data that can be retrieved. So it can return any stored fields including stored text fields.


Iterating the topic with a daemon


In order to process all the records in a topic, we will need to call the topic repeatedly until it stops returning results. The daemon function can do this for us.

The daemon function wraps another function and calls it at intervals using an internal thread. When a daemon is passed to the /stream handler, the /stream handler recognizes it and keeps it in memory so that it can run until it's completed it's job.

Here is the basic syntax:

daemon(id="myDaemon",
               terminate="true",
               topic(checkpointCollection,
                        dataCollection,
                        q="*:*",
                        fl="from, to, body",
                        id="myTopic",
                        rows="300",
                        initialCheckpoint="0"))

Notice the terminate parameter, which is new in Solr 6.3. This tells the daemon to terminate after the topic has no more records to process.

Sending Tuples to another SolrCloud collection


As the daemon iterates a topic it can send the results to another SolrCloud collection using the update function.

Here is the basic syntax:

daemon(id="myDaemon",
               terminate="true",
               update(destinationCollection,
                           batchSize=300,
                           topic(checkpointCollection,
                                     dataCollection,
                                     q="*:*",
                                     fl="from, to, body",
                                     id="myTopic",
                                     rows="300",
                                     initialCheckpoint="0")))

The example above is sending all the Tuples that are emitted by the topic to another SolrCloud collection.


Performing transformations on the Tuples


The data in the Tuples emitted by the topic can be adjusted/transformed before they are sent to the destinationCollection. Here is a very simple example:

daemon(id="myDaemon",
               terminate="true",
               update(destinationCollection,
                           batchSize=300,
                           select(from as from_s,
                                      to as to_s,
                                      body as body_t,
                                      topic(checkpointCollection,
                                                dataCollection,
                                                q="*:*",
                                                fl="from, to, body",
                                                id="myTopic",
                                                rows="300",
                                                initialCheckpoint="0"))))

In the example above the select function is changing the field names in the Tuples before the tuples are processed by the update function.


Text analysis and transformation


Starting with Solr 6.3 Streaming Expressions have access to Lucene/Solr's text analyzers. This allows functions to be written that process text using the full power of these analyzers. The first example of this is the classify funtion in Solr 6.3, which analyzes text in a text field and extracts the features needed to perform classification using a linear classifier. This deserves an entire blog in it's own right so it's only mentioned here as an example of how Streaming Expressions can use analyzers.

Also in Solr 6.3 you can add your own Stream Expressions and register them in the solrconfig.xml. So you can write and plugin your own text analysis functions.


Fetching data from other collections with the fetch function


In Solr 6.3 the fetch function has been added that allows fields from other collections to be fetched and added to documents. Here is the sample syntax:

daemon(id="myDaemon",
               terminate="true",
               update(destinationCollection,
                           batchSize=300,
                           fetch(userAddresses,
                                     on="from=userID",
                                     fl="address",
                                     topic(checkpointCollection,
                                                dataCollection,
                                                q="*:*",
                                                fl="from, to, body",
                                                id="myTopic",
                                                rows="300",
                                                initialCheckpoint="0"))))

The example above fetches the address from the usersAddresses collection, by mapping the from field in the Tuples to the userID in the userAddresses collection.

Parallel batch processing


The parallel function can be used to partition a batch job across a cluster of worker nodes. Here is the basic syntax:

parallel(workerCollection,
               workers=10,
               sort="DaemonOp desc",
               daemon(id="myDaemon",
                              terminate="true",
                              update(destinationCollection,
                                           batchSize=300,
                                           select(from as from_s,
                                                      to as to_s,
                                                      body as body_t,
                                                      topic(checkpointCollection,
                                                                dataCollection,
                                                                q="*:*",
                                                                fl="id, from, to, body",
                                                                id="myTopic",
                                                                rows="300",
                                                                initialCheckpoint="0",
                                                                partitionKeys="id")))))

In this example the parallel function sends the daemon to 10 worker nodes. Each worker will process a partition of the topic. Notice that the partitionKeys field has been added to the topic. This tells the topic to hash partition on the id field in the dataCollection.

Quick note about the DaemonOp sort parameter. The parallel function sends the daemon to 10 worker nodes. The worker nodes return a Tuple that confirms that a daemon operation was started. The DaemonOp is simply the confirmation Tuple. The parallel function never sees the tuples generated by the topic function as they are sent to another SolrCloud collection.


Parallel batch throughput


When performing parallel batch operations, each worker will be iterating over the topic in parallel. The topic function randomly selects a replica from each shard to query for it's data. So when performing parallel batch operations all of the replicas in the cluster will be streaming content at once.    

Feature Scaling with Solr Streaming Expressions

Before performing machine learning operations its often important to scale the feature vectors so they can be compared at the same scale. In...