Monday, January 23, 2017

Deploying an AI alerting system with Solr's Streaming Expressions

As of Solr 6.4, Streaming Expressions provide all the tools you need to deploy an Artificial Intelligence alerting system. Let's take a look at some of the tools involved and then walk through a simple AI alerting use case.


The Tools


1) A Text Classifier: Solr's Streaming Expressions allow you to train, store, and deploy a text classifier. With a text classifier you can train a model to determine the probability that a document belongs to a certain class. This provides the brains behind the alert.

2) A Messaging System: Solr's Streaming Expressions library provides the topic function, which allows clients to subscribe to a query. Once the subscription is established the topic provides one-time delivery of documents that match the topic query. This provides the nervous system of the alerting engine.

3) Actors: Solr's Streaming Expressions library provides the daemon function, which are processes that live inside Solr. Daemon's have the ability to subscribe to topics, apply a classifier, update collections and take other actions. This provides the muscle behind the alerts.

Use Case: A Threat Alerting Engine


Let's walk through the steps for building an engine that detects threats in social media posts and sends alerts.

Step 1: Train a model that classifies threats


To train the model we need to assemble a training set comprised of positive and negative examples of the class. In this case we need to gather a set of social media posts that are threats and another set of social media posts that are not threats.

Once we have our training set we can load the data into a Solr Cloud collection. Then we can use the featurestrain and update functions to extract the key features, train the model (based on the features) and store the model iterations in a Solr Cloud collection :

update(models,
             batchSize="50",
             train(trainingSet,
                      features(trainingSet,
                                     q="*:*",
                                     featureSet="threatFeatures",
                                     field="body",
                                     outcome="out_i",
                                     numTerms=250),
                      q="*:*",
                      name="threatModel",
                      field="body",
                      outcome="out_i",
                      maxIterations="100"))



Let's explore this expression in more detail.

The features function extracts the key features (terms) from the training data. The features function scores terms using a technique known as Information Gain to determine which features are the most important in distinguishing the positive and negative set. The features function also extracts the term statistics from the training set needed to build the model. The online documentation contains more detailed information for the features function.

The train function uses the terms and statistics emitted by the features function to train the model. The train function use a parallel iterative, batch gradient descent approach to train a logistic regression text classifier. The train function emits a tuple for each iteration of the model. Each model tuple includes the terms, weights, error rate and a confusion matrix that describes the classification errors for the iteration. The online documentation contains more detailed information for the train function.

The update function is used to store each iteration of the model in a Solr Cloud collection. The online documentation contains more detailed information for the update function. 

Step 2: Inspect and test the model


The next step is to validate that you have a good model. A quick inspection of the model iterations can be useful in understanding the model. Here are a few things to look for:

a) Look at the terms in the model. A quick overview of the terms can be useful in understanding what the key features are in the model.

b) Look at weights for the terms to get a feel for how the terms are weighted in the model.

c) Look at the confusion matrix for the last iteration of the model to get a feeling for error rates that occurred in the final training iteration.

d) Look at the iterations to see if the model converged, which means the error rates gradually reduced until they reach a point of stabilization.

After reviewing the model we can test the model on a test dataset. The test dataset should consist of positive and negative examples of the class and should not be the same as the training dataset.

The expression below runs the classifier on the test set:

classify(model(models,
                          id="threatModel",
                          cacheMillis=5000),
              search(testData,
                          q="*:*",
                          fl="id,  body",
                          sort="id desc"),
              field="body")

Let's breakdown the expression above.

The classify function calls the model function to retrieve a named model stored in a Solr Cloud collection. In the example above it retrieves the threatModel which we built earlier. The model function retrieves the highest iteration of the model found in the collection and caches it in memory for a specified period of time. For more detailed information on the classify, model and search functions you can review the documentation.

Once the classify function has the model, it reads tuples from an internal stream and scores the tuples by applying the model to a text field in the document. The classify function emits all the tuples from the underlying stream and adds a field called probability_d, which is a score between 0 and 1 that describes the probability that the tuple is in the class. The higher the score, the higher the probability. You can start with the score of .5 or greater as the threshold for positive or negative class assignment.

You can then read the tuples to see how they were scored and determine how accurate the classifier is. If there are false positives or false negatives in the test results then inspect the records to see why they might have been misclassified. If there are missing features in the training set you may need to add more examples that include these missing features and rebuild the model.

You can also adjust the threshold to see if it creates a more accurate classifier. For example if changing the threshold from .5 to .6 provides fewer false positives without increasing false negatives then you can use that as the threshold for the classifier when deployed.

Step 3: Setup the Actor


Once you're satisfied with the model, its time to create an actor that can read new documents as they enter the index, classify them and index the threats in a separate collection.

To do this we'll setup a daemon to run the classify function. But instead of using a search function as the internal stream to classify, we'll use a topic.

Here is the basic syntax for this:

daemon(id="alertDaemon",
              update(threats,
                          batchSize="10",
                          having(classify(model(models,
                                                                id="threatModel",
                                                                cacheMillis="5000"),
                                                     topic(messages,
                                                               checkpoints,
                                                               id="messageTopic",
                                                               q="*:*",
                                                               fl="id, body"),
                                                     field="body"),
                                         gt(probability_d, 0.5)
                                       )
                           )
             )


Let's explore this expression from inside-out.

The inner topic expression is subscribing to the messages collection. The messages collection holds the social media messages we are alerting on. The topic will provide one time delivery of documents in this collection. Each time it is called it will return a new batch of documents.

The classify function wraps the topic and scores each tuple based on the model retrieved by the internal model function. The classify function emits the tuples from the topic and adds the class score to the outgoing tuples in the probability_d field.

The having expression wraps the classify function and only emits tuples with a probability_d greater than 0.5. The having function is setting the classification threshold for the alert.  The online documentation contains more detailed information for the having function.

The update function indexes the tuples emitted by the having function into a Solr Cloud collection called threats. The threats collection is where the messages classified as threats are stored.

The daemon function wraps the update function and calls it at intervals using an internal thread. Each time the daemon runs, a new batch of messages will be classified and the threats will be added to the threats collection. New records added to the messages collection will automatically be classified in the order they were added. The online documentation contains more detailed information for the daemon function.


Step 4: Adding actors to watch the threats collection


We now have a threats collection where the threats are being indexed. Any number of actors can subscribe to the threats collection using a topic function or TopicStream java class. These actors can be daemons running inside Solr or actors external to Solr.

In this scenario the threats collection is used as a message queue for other actors. These actors can then be programmed to take specific actions based on the threat.


Wednesday, January 18, 2017

Deploying Solr's New Parallel Executor

In an a recent blog we explored Solr's new parallel batch capabilities. This blog expands on parallel batch and introduces Solr's new parallel executor. Solr's parallel executor allows Streaming Expressions to be stored in a Solr Cloud collection where they can be streamed to worker nodes and executed in parallel.


The executor Function


The new executor function performs the compilation and execution of Streaming Expressions on worker nodes. The executor function has an internal thread pool that executes Streaming Expressions in parallel within a single worker node. The queue of streaming expressions can also be partitioned across a cluster of worker nodes providing a second level of parallelism.


Deploying a General Purpose Work Queue


The executor function can be used with the daemon and topic functions to deploy a general purpose work queue. An example of this expression construct is below:

daemon(id="daemon1",
               executor(threads=5,
                               topic(checkpointCollection,
                                         storedExpressions,
                                         id="topic1",
                                         initialCheckpoint=0,
                                         q="*:*",
                                         fl="id, expr_s")))

Let's break down the expression above starting with the topic function.

The topic function subscribes to a query and provides one-time delivery of documents that match the query. In the example, the topic function is subscribed to a collection of stored Streaming Expressions.

The executor function wraps the topic and for each tuple it compiles and runs the expression in the expr_s field. The executor has an internal thread pool and each expression is compiled and run in its own thread. The threads parameter controls the size of the thread pool.

The daemon function wraps the executor and calls it at intervals using an internal thread. This will cause the executor to iterate over the topic and execute all the Streaming Expressions in the work queue in batches.

The daemon function will continue to run at intervals when the queue is empty. As new tasks are indexed into the queue they will automatically be read by the topic and executed.

Prioritizing Tasks with the priority Function  


In the example above, the executor will run tasks in the order that they are emitted by the topic. Topic's emit tuples ordered by Solr's internal _version_ number. This behaves similar to a FIFO queue (but without strict FIFO enforcement). But the topic alone doesn't have any concept of task prioritization.

The priority function can be used to allow higher priority tasks to be scheduled ahead of lower priority tasks. The priority function wraps two topics. The first topic is the higher priority queue and the second topic is the lower priority queue. The priority function will only emit a lower priority task when there are no higher priority tasks in the queue.

daemon(id="daemon1",
               executor(threads=5,
                               priority(topic(checkpointCollection,
                                                      highPriorityTasks,
                                                      id="high",
                                                      initialCheckpoint=0,
                                                      q="*:*",
                                                     fl="id, expr_s"),
                                             topic(checkpointCollection,
                                                       lowPriorityTasks,
                                                       id="low",
                                                       initialCheckpoint=0,
                                                       q="*:*",
                                                      fl="id, expr_s")))


Deploying a Parallel Work Queue


The parallel function can be used to partition tasks across a worker collection. This provides parallel execution within a single worker and across a cluster of workers. The syntax for deploying a parallel work queue is below:

parallel(workerCollection,
              workers=6,
              sort="DaemonOp asc",
              daemon(id="daemon1",
                             executor(threads=5,
                                             topic(checkpointCollection,
                                                       storedExpressions,
                                                       id="topic1",
                                                       initialCheckpoint=0,
                                                       q="*:*",
                                                       fl="id, expr_s",
                                                       partitionKeys="id"))))

In the example above the parallel function sends daemons to 6 workers. Each worker executes a partition of the work queue.

Deploying Replicas to Increase Cluster Capacity


Expressions run by the executor search Solr Cloud collections to retrieve data. These searches will be spread across all of the replicas in the Solr Cloud collections. As the number of workers executing expressions increases, more replicas can be added to the Solr Cloud collections to increase the capacity of the entire system.

Thursday, January 12, 2017

Solr 6.3: Finding the most relevant facets with the scoreNodes Streaming Expression

Starting with Solr 6.3 you can use the scoreNodes Streaming Expression to find the most relevant facets and significant relationships in a distributed graph. This blog describes how the scoreNodes function can be used with facets. A future blog will cover using scoreNodes with graph expressions.

Why Score Facets?

One typical use case for scoring facets would be for lightning fast recommendations based on market basket co-occurrence. We'll explore this scenario below:

First let's look at the syntax for scoring facets:

scoreNodes(facet(baskets,
                             q="products:A",
                             buckets="products",
                             bucketSorts="count(*) desc",
                             bucketSizeLimit=50,
                             count(*)))

Let's breakdown what the expression is doing.

The facet expression calls Solr's JSON facet API and emits tuples which contain the facet results. In this case it is searching the baskets collection. The query is looking for all records in the baskets collection that have product A in the products field.

The baskets collection contains a multi-valued field called products which contains all the products in the basket. For example

id                 products
basket1        [A, B, C]
basket2        [A, C, E]
basket3        [B, C, D]

The sample facet expression will return the following tuples:

products:  C
count(*):  2

products:  B
count(*):  1

products: E
count(*): 1

Product C is in two baskets with product A. Products B and E are both in one basket with product A.

So it would seem that the most relevant facet/product for product A would be product C.

But, there is something we don't know yet. How often product C occurs in all the baskets. If product C occurs in a large percentage of baskets, then it doesn't have any particular relevance to product A.

This is where the scoreNodes function does it's magic. The scoreNodes function scores the facets based on the raw facet counts and their frequency across the entire collection.

The scoring formula is similar to the tf*idf scoring algorithm used to score results from a full text search. In the full text context tf (term frequency) is the number of times the term appears in the document. idf (inverse document frequency) is computed based on the document frequency of the term, or how many documents the term appears in. The idf is used to provide a boost to terms that are more rare in the index.

scoreNodes uses the same  principal to score facets, but the facet count is used as the tf value in the formula. The idf is computed for each facet term based on global statistics across the entire collection. The effect of the scoreNodes algorithm is to provide a boost to facet terms that are rarer in the collection.

The scoreNodes functions adds a field to each facet tuple called nodeScore, which is the relevance score for the facet. You can use the top expression to find the most relevant facet:

top(n=2, sort="nodeScore desc",
      scoreNodes(facet(baskets,
                                   q="products:A",
                                   buckets="products",
                                   bucketSorts="count(*) desc",
                                   bucketSizeLimit=50,
                                   count(*)))

The expression above emits the two highest scoring facets based on the nodeScore.

Solr temporal graph queries for event correlation, root cause analysis and temporal anomaly detection

Temporal graph queries will be available in the 8.9 release of Apache Solr. Temporal graph queries are designed for key log analytics use ...