Monday, October 31, 2016

Solr 6.3: Batch jobs, Parallel ETL and Streaming Text Transformation

Solr 6.3 is on it's way and along with it comes a new execution paradigm for Solr: Parallel batch. Solr's new batch capabilities open up a new world of use cases for Solr. This blog will cover the basics of how the parallel batch framework operates and describe how it can be used to perform parallel ETL and parallel text transformations.

Not built on MapReduce


Solr's Streaming Expressions have had MapReduce capabilities for quite a while now. But Solr's MapReduce implementation is designed to support interactive queries over large data sets and to power the Parallel SQL interface when run in MapReduce mode.

And notably Solr's MapReduce implementation does not support streaming of text fields, which makes it unsuitable for performing ETL and text transformations in Solr.

Parallel batch is built on message queues


Solr also has massaging capabilities that allow an entire SolrCloud collection to be treated like a message queue, similar in nature to Apache Kafka. While Apache Kafka is more scalable, Solr's messaging queue is more flexible in that it allows you to subscribe to a query.

The Streaming Expression that allows you to subscribe to a query is the topic expression. Here is the basic syntax:

topic(checkpointCollection,
          dataCollection,
          q="*:*",
          fl="from, to, body",
          id="myTopic",
          rows="300",
          initialCheckpoint="0")

Here is an explanation of the parameters:

  1. checkpointCollection: The topic function tracks the checkpoints for a specific topic id. It stores the checkpoints in this collection.
  2. dataCollection: This is the collection that the topic results are pulled from.
  3. q: The query to use to pull records for this topic.
  4. id: The unique id of the topic. A different set of checkpoints will be maintained for each unique topic id.
  5. rows: The number of rows to fetch from each shard, each time the topic function is called.
  6. initialCheckpoint: Where in the queue to start fetching results from. Setting to 0 will cause the topic to match all the records that match the topic query in the collection. Not setting the initialCheckpoint will cause the topic to begin fetching records that have been added after the topic has been initiated. 
When the topic function is sent to the /stream handler it will retrieve a batch of rows from the topic and update the checkpoints for the topic. The next time it's called it will retrieve the next batch of rows.

The topic function has no restriction on the data that can be retrieved. So it can return any stored fields including stored text fields.


Iterating the topic with a daemon


In order to process all the records in a topic, we will need to call the topic repeatedly until it stops returning results. The daemon function can do this for us.

The daemon function wraps another function and calls it at intervals using an internal thread. When a daemon is passed to the /stream handler, the /stream handler recognizes it and keeps it in memory so that it can run until it's completed it's job.

Here is the basic syntax:

daemon(id="myDaemon",
               terminate="true",
               topic(checkpointCollection,
                        dataCollection,
                        q="*:*",
                        fl="from, to, body",
                        id="myTopic",
                        rows="300",
                        initialCheckpoint="0"))

Notice the terminate parameter, which is new in Solr 6.3. This tells the daemon to terminate after the topic has no more records to process.

Sending Tuples to another SolrCloud collection


As the daemon iterates a topic it can send the results to another SolrCloud collection using the update function.

Here is the basic syntax:

daemon(id="myDaemon",
               terminate="true",
               update(destinationCollection,
                           batchSize=300,
                           topic(checkpointCollection,
                                     dataCollection,
                                     q="*:*",
                                     fl="from, to, body",
                                     id="myTopic",
                                     rows="300",
                                     initialCheckpoint="0")))

The example above is sending all the Tuples that are emitted by the topic to another SolrCloud collection.


Performing transformations on the Tuples


The data in the Tuples emitted by the topic can be adjusted/transformed before they are sent to the destinationCollection. Here is a very simple example:

daemon(id="myDaemon",
               terminate="true",
               update(destinationCollection,
                           batchSize=300,
                           select(from as from_s,
                                      to as to_s,
                                      body as body_t,
                                      topic(checkpointCollection,
                                                dataCollection,
                                                q="*:*",
                                                fl="from, to, body",
                                                id="myTopic",
                                                rows="300",
                                                initialCheckpoint="0"))))

In the example above the select function is changing the field names in the Tuples before the tuples are processed by the update function.


Text analysis and transformation


Starting with Solr 6.3 Streaming Expressions have access to Lucene/Solr's text analyzers. This allows functions to be written that process text using the full power of these analyzers. The first example of this is the classify funtion in Solr 6.3, which analyzes text in a text field and extracts the features needed to perform classification using a linear classifier. This deserves an entire blog in it's own right so it's only mentioned here as an example of how Streaming Expressions can use analyzers.

Also in Solr 6.3 you can add your own Stream Expressions and register them in the solrconfig.xml. So you can write and plugin your own text analysis functions.


Fetching data from other collections with the fetch function


In Solr 6.3 the fetch function has been added that allows fields from other collections to be fetched and added to documents. Here is the sample syntax:

daemon(id="myDaemon",
               terminate="true",
               update(destinationCollection,
                           batchSize=300,
                           fetch(userAddresses,
                                     on="from=userID",
                                     fl="address",
                                     topic(checkpointCollection,
                                                dataCollection,
                                                q="*:*",
                                                fl="from, to, body",
                                                id="myTopic",
                                                rows="300",
                                                initialCheckpoint="0"))))

The example above fetches the address from the usersAddresses collection, by mapping the from field in the Tuples to the userID in the userAddresses collection.

Parallel batch processing


The parallel function can be used to partition a batch job across a cluster of worker nodes. Here is the basic syntax:

parallel(workerCollection,
               workers=10,
               sort="DaemonOp desc",
               daemon(id="myDaemon",
                              terminate="true",
                              update(destinationCollection,
                                           batchSize=300,
                                           select(from as from_s,
                                                      to as to_s,
                                                      body as body_t,
                                                      topic(checkpointCollection,
                                                                dataCollection,
                                                                q="*:*",
                                                                fl="id, from, to, body",
                                                                id="myTopic",
                                                                rows="300",
                                                                initialCheckpoint="0",
                                                                partitionKeys="id")))))

In this example the parallel function sends the daemon to 10 worker nodes. Each worker will process a partition of the topic. Notice that the partitionKeys field has been added to the topic. This tells the topic to hash partition on the id field in the dataCollection.

Quick note about the DaemonOp sort parameter. The parallel function sends the daemon to 10 worker nodes. The worker nodes return a Tuple that confirms that a daemon operation was started. The DaemonOp is simply the confirmation Tuple. The parallel function never sees the tuples generated by the topic function as they are sent to another SolrCloud collection.


Parallel batch throughput


When performing parallel batch operations, each worker will be iterating over the topic in parallel. The topic function randomly selects a replica from each shard to query for it's data. So when performing parallel batch operations all of the replicas in the cluster will be streaming content at once.    

Friday, August 19, 2016

Training and Storing Machine Learning Models with Solr 6.2

Solr's Streaming Expressions can already perform Parallel SQL, Graph Traversal, Messaging, MapReduce and Stream Processing. In Solr 6.2 Streaming Expressions adds machine learning capabilities.

Starting with Solr 6.2 you'll be able to train a Logistic Regression Text Classifier on training sets stored in Solr Cloud. This means Solr can now build machine learning models natively.

Now Solr can learn.

What is a Logistic Regression Text Classifier?


Logistic regression is a supervised machine learning classification algorithm. Logistic regression is used to train an AI model that classifies data. Logistic Regression is a binary classifier which means that it's used to classify if something belongs to a class or not.

Logistic Regression is a supervised algorithm, which means it needs a training set that provides positive and negative examples of the class to build the model.

Logistic Regression models are trained using specific features that describe the data. With text classification the features of the data are the terms in the documents.

Once the model is trained it can be used to classify other documents based on their features (terms).

The model returns a score predicting the probability that a document is in the class or not.

Solr's Implementation Through Streaming Expressions


Solr 6.2 has two new Streaming Expression functions: features and train.

Features: Extracts the key terms from a training set stored in a Solr Cloud collection. The features function uses an algorithm called Information Gain to determine which terms are most important for the specific training set.

Train: Uses the extracted features to train the Logitistic Regression Model. The train function uses a parallel iterative, batch Gradient Descent approach to optimize the model. The algorithm is embedded into Solr's core so only the model is streamed across the network with each iteration.


Storing Features and Models



The output from both the features and train function can be redirected to another SolrCloud collection using the update function.

This approach allows Solr to store millions of models that can be easily retrieved and deployed.


Learning What Users Like


One of the interesting use cases for this feature is to build models for every user which encapsulates what users like to read.

In order to build a model for each user, we need to pull together positive and negative data sets for each user. The positive set includes documents that the user has an interest in and the negative set contains documents the user has not shown a particular interest in.


Using Graph Expressions To Build The Positive Set


Application usage logs can be queried to find what the user likes to read (positive set). Indexing the usage logs in Solr Cloud allows us to run graph queries that identify the positive set.

Graph queries can be run that return a set of documents that the user has read. Graph queries can also be used to expand the training set. For example the training set can be expanded to include the documents that are most frequently viewed in the same session with documents that the user has viewed.

Graph queries can also be used to expand the positive training set through collaborative filtering. Using this approach documents read by users that have similar reading habits to the user can be pulled into the positive set.


Collecting the Negative Training Set


Logistic Regression requires a negative training set which can be gathered by down sampling the main corpus. This means taking a random sample from the main corpus that is similar in size to the positive training set.

The random Streaming Expression can be used for down sampling. The random Streaming Expression returns a random set of documents that match a query.

Low frequency terms from the positive training set can be extracted and used to negate documents from the random negative training set. This would ensure that the negative training set doesn't include key terms from the positive training set.

Using Easy Ensemble to Provide Larger Samples of Negative Data


If the negative training set is too small to represent the main corpus an approach known as Easy Ensemble can be used to expand the sample size of the negative set.

With Easy Ensemble multiple down sampled negative sets are collected and the positive training set is trained against each negative set. This will train an ensemble of models that can be used to classify documents.

With the ensemble approach the output from all the models are averaged to arrive at an ensemble classification.


Automating the Building of Training Sets With Daemons


Solr 6 introduced daemons. Daemons are Streaming Expressions that live inside Solr and run in the background at intervals.

Daemons can now be used to setup processes inside Solr that monitor the logs and build models for users as new data enters the logs.

Once the daemon processes are in place Solr can learn on it's own.

Future blogs will discuss how to setup daemons to monitor the logs and take action.


Re-Ranking and Recommending


Once the models are stored in a Solr Cloud collection they can be retrieved and used to score documents. The score reflects the probability that the user will like the document.

The scores can be applied in Solr's re-ranker to re-order the top N search results. This provides custom rankings for each user.

The scores can also be used to score recommendations created from graph expressions or More Like This queries. This provides personalized recommendations.

Solr temporal graph queries for event correlation, root cause analysis and temporal anomaly detection

Temporal graph queries will be available in the 8.9 release of Apache Solr. Temporal graph queries are designed for key log analytics use ...