Getting Up to Speed on Real-Time Machine Learning with Spark and SBERT

ODSC - Open Data Science
8 min readJun 6, 2023

Editor’s note: Dillon Bostwick and Avinash Sooriyarachchi are speakers for ODSC Europe 2023 this June 14th-15th. Be sure to check out their talk, “Getting Up to Speed on Real-Time Machine Learning,” there!

The benefits of real-time machine learning are becoming increasingly apparent. Digital native companies have long proven that use cases like fraud detection, recommendation systems, and dynamic pricing benefit from lower processing latencies. In a recent KDD paper, Booking.com found that even a 30% improvement in model serving latency caused a .5% decrease in user conversion, a significant cost to their business.

While real-time machine learning presents many opportunities, few teams successfully serve production-grade machine learning models in real-time, and most struggle to deliver the feature freshness needed for low-latency inference. This is due to a deep disconnect between data engineering and data science practices. Historically, our space has perceived streaming as a complex technology reserved for experienced data engineers with a deep understanding of incremental event processing. But now, modern streaming platforms make it much easier for anyone to build reliable streaming pipelines, regardless of their streaming background.

Anomaly detection, including fraud detection and network intrusion monitoring, particularly exemplifies the challenges of real-time machine learning. In these use cases, the ability to detect various drifts in the data has an immediate impact on business risk. Models must be continuously retrained or fine-tuned to adjust to various forms of drift very quickly, and predictions generally must be made at low latency to provide an optimal customer experience. In adversarial scenarios like e-commerce fraud, failure to adjust the model to new attack patterns can result in incorrectly predicting the 3% of transactions that are fraudulent.

Using Embeddings to Detect Anomalies

Figure 1: Using a trained deep neural network, it is possible to convert unstructured data to numeric representations, i.e., embeddings

Embeddings are numerical representations generated from unstructured data like images, text, and audio, and greatly influence machine learning approaches for handling such data. While embeddings have become a popular way to represent unstructured data, they can also be generated for categorical and numeric variables in tabular datasets. This allows multi-modal unstructured and structured data to be embedded in a common embedding space, where similar observations are located close to each other and dissimilar ones are far apart.

Figure 2: In an embedding space, embeddings corresponding to similar items are located closer and dissimilar ones are far apart. [source]

Multi-modal embeddings, such as those produced by Meta’s ImageBind model, are an emerging trend in which neural network architectures map data from diverse modalities into a shared embedding space. This approach offers a fresh perspective on anomaly detection problems. For instance, outliers can be detected within an embedding space involving image, text, and tabular data.

With this in mind, we can use a real-time event streaming framework to monitor embedding drift across features to determine underlying concept drift or detect anomalous events. To demonstrate the power of combining streaming with embeddings, we will present an example using a combination of Apache Spark Structured Streaming and BERT transformers to detect real-time embedding drift and find hidden anomalies faster and more accurately. This is an exciting combination of two powerful technologies traditionally used by disparate personas in an organization. This can be further applied to multi-modal embeddings to find novel patterns throughout an organization’s data that previously would have gone unnoticed.

Using Spark Structured Streaming to Process Embeddings in Real-Time

We will rely heavily on Apache Spark Structured Streaming’s high-level semantics for stateful processing. Spark supports 3 different kinds of window operations over event time:

We can combine one of these event-time window functions with applyInPandas(), a newer Spark feature added in version 3.2.1 (October 2022). This function makes it easy to define custom aggregation functions in Python. When combined with event-time windows, analyzing the embeddings in real-time becomes much more feasible.

When combining these capabilities, we can see how anomalous an event is relative to other events that happened shortly before or even shortly after the event occurred due to Structured Steraming’s stateful processing capabilities. This means we no longer need a static ground truth to determine anomalous behavior via unsupervised models with no training phase. Spark Structured Streaming provides the semantics and guarantees needed to do this, and the net result is a more accurate, more robust fraud detection framework that can reduce losses by millions of dollars.

Anomaly Detection with Real-Time Embeddings

Imagine we have some text, images, or tabular data arriving at a Delta Lake table every few seconds. Even though the data changes every second, we can perform a consistent static query on the current state of the table with Databricks. In this example, we will use text for simplicity, but we can easily extend this to multi-modal embeddings:

First, we want to use Spark’s Pandas UDF to apply SBERT, a state-of-the-art sentence encoder, on the real-time stream of text. Here, the Pandas UDF simplifies the hand-off between complex distributed event streaming and locally scoped Python functions.

Once we are working with the Pandas series, we simply grab the SBERT model from the HuggingFace Model Hub:

@pandas_udf(ArrayType(DoubleType()))
def get_embeddings(texts: pd.Series) -> pd.Series:
model = SentenceTransformer('sentence-transformers/all-mpnet-base-v2')
embeddings = model.encode(texts.to_numpy())
return pd.Series(embeddings.tolist())

Next, we’ll use Spark Structured Streaming to read the raw data and create embeddings from the text column. If using Databricks, this is all we need to do to get up and running with a real-time stream of embeddings:

embedding_stream = spark.readStream \
.table('raw_records') \
.withColumn('embeddings', get_embeddings('text'))

Now, we want to define a function that receives and returns a Pandas dataframe (pdf). Spark provides this abstraction layer to make it easy for a data engineer to pass this interface to an ML engineer to implement. In this function, we determine the Euclidean mean of all the embeddings in the given dataframe, then calculate the Euclidean distance to determine how far each embedding is from that mean. The ML engineer could use this interface to further experiment with KMeans Clustering or Nearest Neighbor search, or try different distance methods like cosine similarity:

def get_dists_to_mean(pdf):
# Stack embeddings as a 2D array
embeddings = np.vstack(pdf["embeddings"].values)
    # Perform K-means clustering
kmeans = KMeans(n_clusters=K, random_state=42)
cluster_labels = kmeans.fit_predict(embeddings)
cluster_centers = kmeans.cluster_centers_
dist_to_cluster_mean = []
for i in range(len(embeddings)):
# Calculate distance to the mean of the cluster
cluster_idx = cluster_labels[i]
dist_to_cluster_mean.append(
pairwise_distances(
[embeddings[i]],
[cluster_centers[cluster_idx]]).min())
pdf["kmeans_dist"] = dist_to_cluster_mean
return pdf

Finally, we will use Spark’s built-in support for “sliding” windows over a stream to apply the above function to all embeddings that happened temporally close to one another. The sliding window allows us to ensure that an event’s embedding is compared to events that happened recently earlier, but also again in the future to events that happened after it. Intuitively, we may stop regarding an event as anomalous if we start to ingest events that are in similar embedding space a few minutes later. Or, events could become anomalous if future embeddings begin to drift within the subsequent sliding window. So with Spark’s sliding windows, the fraud score of a single event could be different across multiple windows. As with most streaming applications, we need to tune these time intervals to find the right balance between accuracy and latency.

Once we create our sliding window, we can simply use applyInPandas to apply the distance calculation function to all the events in the given window. Again, this makes the handoff between the developer building the streaming pipeline and the ML engineers much easier:

from pyspark.sql.functions import *
WINDOW_LENGTH = '10 minutes' # How long the window should wait for data
WINDOW_SLIDE = '5 minutes' # How much overlap between windows
ranked_stream = embedding_stream \
.withWatermark('ts', '5 minutes') \
.groupBy(window(embedding_stream['ts'], WINDOW_LENGTH, WINDOW_SLIDE)) \
.applyInPandas(get_dists_to_mean,
'text string, ts timestamp, embeddings array<double>, kmeans_dist double'
)

Even though the stream is continuous, we can use Databrick’s display() function to peek at the stream and sort by the current results. We can clearly see that unusual text has a much higher kmeans_dist, which we could use as a fraud score. Spark determined that these happened temporally close to each other, and created a single window over the data. To learn more about the mechanics of stateful functions in Spark Structured Streaming, see here.

Conclusion

While this is a simple example, it demonstrates the potential of combining low-latency event streaming and recent innovations in the application of AI. We can further extend this to use multi-modal embeddings and find hidden anomalies across all our organization’s data.

If you’re interested in learning more about real-time machine learning, join us for our talk “Getting Up to Speed on Real-Time Machine Learning” at ODSC London (virtual viewing is available). In the talk you will learn:

  • Important patterns for real-time model inference
  • How to prioritize the most common real-time ML use cases in your business
  • How to evaluate streaming tools, and why streaming is valuable at any latency
  • Operational concerns like monitoring, drift detection, and feature stores

About the authors/ODSC Europe 2023 speakers:

Dillon Bostwick is a Solutions Architect at Databricks, where he’s spent the last five years advising customers ranging from startups to Fortune 500 enterprises. He currently helps lead a team of field ambassadors for streaming products and is interested in improving industry awareness of effective streaming patterns for data integration and production machine learning. He previously worked as a product engineer in infrastructure automation.

Avinash Sooriyarachchi is a Senior Solutions Architect at Databricks. His current work involves working with large Retail and Consumer Packaged Goods organizations across the United States and enabling them to build Machine Learning based systems. His specific interests include streaming machine learning systems and building applications leveraging foundation models. Avi holds a Master’s degree in Mechanical Engineering and Applied Mechanics from the University of Pennsylvania.

Originally posted on OpenDataScience.com

Read more data science articles on OpenDataScience.com, including tutorials and guides from beginner to advanced levels! Subscribe to our weekly newsletter here and receive the latest news every Thursday. You can also get data science training on-demand wherever you are with our Ai+ Training platform. Subscribe to our fast-growing Medium Publication too, the ODSC Journal, and inquire about becoming a writer.

--

--

ODSC - Open Data Science

Our passion is bringing thousands of the best and brightest data scientists together under one roof for an incredible learning and networking experience.