Real time stream processing with Databricks and Azure Event Hubs

Our use case

Here at Fexco, a fundamental part of our architecture requires real time stream processing. Specifically, the fast lane stream processing. To know what I’m talking about, please have a look to this post.

Basically, on the fast lane we need to listen from an Event Hub (Incoming), do some operation with that event, and then write the output in another Event Hub (Outgoing).

All this process need to happen in less than 200 ms. To achieve this, we have tried several approaches and collected metrics until we finally meet our goal. So in this post we are going to describe the approaches that we’ve tried until finally meet our goal.

The simplistic approach

In this approach we just follow the examples of this page, so our code looked like this:

val eventHubsConfIncome = EventHubsConf("<Incomming event hub connection stream>").setStartingPosition(EventPosition.fromEndOfStream)

val eventHubsConfOutcome = EventHubsConf("<Incomming event hub connection stream>").setStartingPosition(EventPosition.fromEndOfStream)

spark.readStream
.format("eventhubs")
.options(eventHubsConfIncome.toMap)
.load()
.withColumn("body", $"body".cast(StringType))
.writeStream
.format("eventhubs")
.outputMode("update")
.trigger(Trigger.ProcessingTime("0 second"))
.options(eventHubsConfOutcome.toMap)
.option("checkpointLocation","<Some folder>"))
.start()

The code above, just grab a message from the incoming event hub and put on the outgoing event hub.

Even though it appears to not do anything complicated, our response times were between 750 ms and 1 second, which is way slower than our initial threshold of 200 ms.

To know what to do from here, we need more detailed data. Basically we need to break down the process and see where are the bottleneck. To do that, we need to collect the timestamp at different stages and compare them at the end. This are the stages:

  • Incoming Enqueued time (EIT): The incoming event hub enqueued instant
  • Message read time (MRT): The instant in which the message was read by the spark stream.
  • Message processing time (MPT): The instant in which the message was available to be processed.
  • Outgoing enqueued time (OET): The outgoing event hub enqueued instant

To capture the instants in time we need change our code a little bit. First, we need to create a class to store those instants, actually, two classes, you will see why in a few.

case class IncomingTimes(
incomingEnqueued: java.sql.Timestamp,
read: java.sql.Timestamp
)
case class AllTimes(
incomingEnqueued: Long,
read: Long,
available: Long
)

We are still missing the outgoing enqueued time, but that can only be captured by a program that is listening on the outgoing event hub. To send the instants we change our code the look like this:

spark.readStream
.format("eventhubs")
.options(eventHubsConfIncome.toMap)
.load()
.withColumn("enqueued", col("enqueuedTime").cast(TimestampType))
.withColumn("read", current_timestamp().cast(TimestampType))
.select("enqueued", "read")
.as[IncomingTimes]
.map(message => {

AllTimes(
message.incomingEnqueued.getTime,
message.read.getTime,
System.currentTimeMillis())

}).toJSON.toDF("body").writeStream
.format("eventhubs")
.outputMode("update")
.trigger(Trigger.ProcessingTime("0 second"))
.options(eventHubsConfOutcome.toMap)
.option("checkpointLocation","<some folder>")
.start()

Now we are replacing the incoming body with the instants that we want to capture. Then, at the client we need to collect all these instants and subtract them to see the elapsed time between stages.

Also we need to send several samples to avoid that any random interference mislead us. Therefore, we’ve sent 10 samples and average them, these are the results:

We can see here that the stage in which we are loosing most of the time is between the message read and the message processing. So why is that?

If you look closely at the code, you will notice that we are using Processing Time as our trigger, this mean, that every message is treated as a batch operation and therefore, as a new message is read a new job has to be launched. This suppose and overhead that explains the delay between the message read and the message processing stages.

You may be wondering what are those negative values? I’m not completely sure but, I think is because the event hub server and spark server clocks are not completely in sync, and what we are seeing is just the difference between those clocks.

Using a continuous trigger

In order to improve the times between MRT and MPT. We need to change out trigger to be a continuous one. Unfortunately, this is not supported using the event hub format, but, we can read from the event hubs as if they were kafka streams. This post explains how to do it.

A drawback of using a continuous trigger, is that we cannot use current_timestamp() function. Therefore, the MRT value will not be available for later experiments.

Now our code should look like this:

case class IncomingTimes(
incomingEnqueued: java.sql.Timestamp
)
case class AllTimes(
incomingEnqueued: Long,
available: Long
)
spark.readStream
.format("kafka")
.option("subscribe", "<The incoming topic>")
.option("kafka.bootstrap.servers", "<The kafka server>")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", "<The event hub connection string>")
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("kafka.group.id", "<The consumer group id>")
.option("failOnDataLoss", "false")
.load()
.withColumn("enqueued", col("enqueuedTime").cast(TimestampType))
.select("enqueued", "read")
.as[IncomingTimes]
.map(message => {

AllTimes(
message.incomingEnqueued.getTime,
System.currentTimeMillis())

}).toJSON.toDF("value").writeStream
.format("kafka")
.outputMode("append")
.option("topic", "<The outgoing topic>")
.option("kafka.bootstrap.servers", "<The kafka server>")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", "<The event hub connection string>")
.option("checkpointLocation", "<some folder>")
.trigger(Trigger.Continuous("750 milliseconds"))
.start()

With our code in place we repeat the experiment, and see these results:

When I first see this results, I was like:

So now we are reading quite fast, but sending messages to the outgoing event hub are now slow? We made several experiments, changing a lot of configuration values but we were still getting similar results.

Writing with a ForeachWriter

So we decided to use are our own writer using the examples of this page.

We just took the kafka sink example, and change it a little bit so our sink looked like this:

class KafkaSink(topic:String, servers:String, ehSasl:String) extends ForeachWriter[String] {
val kafkaProperties = new Properties()
kafkaProperties.put("bootstrap.servers", servers)
kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.put("sasl.mechanism", "PLAIN");
kafkaProperties.put("security.protocol", "SASL_SSL");
kafkaProperties.put("sasl.jaas.config", ehSasl);

var producer: KafkaProducer[String, String] = _

def open(partitionId: Long,version: Long): Boolean = {
producer = new KafkaProducer(kafkaProperties)
true
}

def process(row: String): Unit = {

producer.send(new ProducerRecord(topic, row))
}

def close(errorOrNull: Throwable): Unit = {
producer.close()
}
}

And now, the rest code look like this:

val writer = new KafkaSink("<The incoming topic>", "<The incoming topic>", "<The event hub connection string>")

spark.readStream
.format("kafka")
.option("subscribe", "<The incoming topic>")
.option("kafka.bootstrap.servers", "<The kafka server>")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", "<The event hub connection string>")
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("kafka.group.id", "<The consumer group id>")
.option("failOnDataLoss", "false")
.load()
.withColumn("enqueued", col("enqueuedTime").cast(TimestampType))
.select("enqueued", "read")
.as[IncomingTimes]
.map(message => {

AllTimes(
message.incomingEnqueued.getTime,
System.currentTimeMillis())

}).toJSON
.writeStream
.foreach(writer)
.outputMode("update")
.trigger(Trigger.Continuous("50 milliseconds"))
.start()

Repeated the experiment we got this results:

104.6 ms! Which is great, and we are good to go. Nonetheless, if you look at our Kafka sink, you might intuit that it could be improved.

Using the EventHubsForeachWriter

Basically, for every message, our foreach writer is creating a new Kafka producer, send the message and then close it.

Maybe, we could create a single producer instance and share it between messages; or maybe a pool of producers, so if a producer is busy it could use another one.

Luckily, we find out that in the azure event hub spark library, there is class that provides all of this. The class is: EventHubsForeachWriter. We changed our code to use that library instead of our Kafka sink, so it look like this:

val writer = EventHubsForeachWriter(eventHubsConfOutcome)

spark.readStream
.format("kafka")
.option("subscribe", "<The incoming topic>")
.option("kafka.bootstrap.servers", "<The kafka server>")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", "<The event hub connection string>")
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("kafka.group.id", "<The consumer group id>")
.option("failOnDataLoss", "false")
.load()
.withColumn("enqueued", col("enqueuedTime").cast(TimestampType))
.select("enqueued", "read")
.as[IncomingTimes]
.map(message => {

AllTimes(
message.incomingEnqueued.getTime,
System.currentTimeMillis())

}).toJSON
.writeStream
.foreach(writer)
.outputMode("update")
.trigger(Trigger.Continuous("50 milliseconds"))
.start()

Repeated the experiment and we got this results:

With this results we can expect an overhead ~50 ms. Therefore, if the transformation of incoming to the outgoing event happen in less 100 ms, we will have a real time performance.

2 Replies to “Real time stream processing with Databricks and Azure Event Hubs”

  1. Hello, I am trying to do a similar thing (not using Kafka, but reading from one EventHub , joining with a delta table and outputting the results to a new event hub…)
    This is part of my code:
    val detectedCars = messages.join(suspectedCarStream, “LicensePlate”).select(to_json(struct(“*”)) as ‘body)
    val query = detectedCars.writeStream.foreach(writer).outputMode(“update”).trigger(Trigger.Continuous(“500 milliseconds”)).start()

    But I am getting the following error:
    notebook:17: error: type mismatch;
    found : org.apache.spark.sql.eventhubs.EventHubsForeachWriter
    required: org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]
    .foreach(writer)

    Any idea what could be wrong ?

  2. Hi Sam,

    That error happens because the EventHubForeachWriter can only handle strings.

    If you look in the last code snippet of this post, you will find that before writing on the stream we are calling the .toJSON function.

    That function will convert the content of the dataframe/dataset to a JSON string, therefore the returning type will be a Dataset[String] and the EventHubForeachWriter can handle it.

Leave a Reply

Your e-mail address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.