Event Replay with Spark

Background

One of the main characteristic of eventual consistency systems is that, we should never update data, just add it. A lot this topic has been discussed in this other post, so check it out before continue to read this (really, this post will still be here when you finish).

So, what do we mean with Event Replay? Basically, is our ability the get the latest value of an object by “replaying” the changes (events) that were made to this object.

I know, Event Replay is a lot more than that, but just for the sake of simplicity of this post let just stick to this version.

Our Event Replay Algorithm

As many system out there, we store most of our data in the form of tables. It could come in many fashions like: distributed, in-memory, etc, but at the end, it is the same concept, tables.

Events in tables

Almost all of our tables store events. To simplify the development process, we came out with a convention so it should be easy to predict how our events are stored.

This convention is best understood with an example. Let’s say that we own a Groceries Store, and want to store the items we are selling. Specifically, the item’s name and price.

With our convention, the items table will look something like this:

Here we only got two items. What we see here, is some kind of history this two items, hence to see the updated values, we need to go through this history, to do that we just replay our events. We will do that in a few, but first let’s focus on what the table contains.

As you can see, we got 8 columns. The first 3 holds the events information and the rest of them holds the items information.

Let’s see what we store as event information:

  • OBJECT_ID: As the name says, holds the object Id. We will use this object id to group the events that belongs to that particular object
  • OPERATION_TYPE: Can be create or event. Basically, just specify if we are creating (create) an object from scratch or if is an update (event)
  • SEQUENCE: Holds the order in which the events had happened. Relative to that object.

And this are the other columns that holds the items information:

  • NAME: The initial  name
  • CHANGE_NAME: Hold the changes that were made to the name
  • PRICE: The initial price
  • CHANGE_PRICE: Hold the variations of the price
  • CREATED_DATE: Is the date in which the item was added to store’s stock.

Notice that only the column CREATED_DATE is the one that doesn’t have CHANGE_ pair, and that’s because is a read-only property. It means, that in our groceries store, we don’t want that the date in which an item was added to stock could be changed.

Also, some of the change_ columns have ‘null’ values, which means that no change should be made to the correspondent property in that particular event.

Our goal

If we replay the events of our items table, the list of items would look like this.

OBJECT_ID

NAME

PRICE

CREATE_DATE

1 Oreo 1.1 2018/12/04
1 Ice Cream Sandwich 3.0 2018/12/05

As you can see, numeric columns have a different treatment than the other ones, this is because, in our events we simply store the variations of the price. For the other kind of columns, like: strings, dates, etc. we store the new value.

Imagine that our store is growing really fast, so instead of having dozens of items we now have millions. Eventually, we will need to distribute our data, and that’s when replay our events can become hard.

Spark to the rescue

One of the thing I love about spark, is how easy it is to parallelize calculations. So obviously, we can use this feature to replay events a lot more quickly than do it everything in a single thread.  The real question is, how?

Reading as DataFrames

In spark, is common to load in a DataFrame everything that is stored in a hard drive, and is very important to maintain it in that way, otherwise there is no joke on using spark at all.

Also is important to remind that every operation that you made with a DataFrame is treated in a lazily manner, therefore, every transformation that you made; like map, or filter, will be executed when you actually need the values in the DataFrame, like in collect, count or show function.

Also, everything that you put into a transformation must be serializable, this is because, spark will try to split the transformation execution in as much workers as possible.

At this point, is kind of obvious that loading our events in a DataFrame is our way to go. Once we load our events in a DataFrame, we need to group them in the object that they belong to. The code should look something like this:

val eventsDF:DataFrame = loadEvents()
eventsDF.groupBy("OBJECT_ID")

Just grouping is not enough, to replay our events we need apply an aggregation that will compute the replay in every one of our objects. Spark doesn’t come with a aggregate function that suits our needs, hence we need to build our own.

User Defined Aggregate Functions

There is a terrific guide on how to create this kind functions, and you can read here. In short, an UDAF is just a class that will apply a calculation to a subset of values.

In our case, since we are grouping our items by OBJECT_ID, our UDAF will have access to the events of a single item.

Once we have our UDAF, the code will look like this:

val replay = new EventReplay

val eventsDF:DataFrame = loadEvents()
eventsDF.groupBy("OBJECT_ID").agg(
  replay(col("OPERATION_TYPE"), col("SEQUENCE"), col("NAME"), col("CHANGE_NAME")).as("NAME"),
  replay(col("OPERATION_TYPE"), col("SEQUENCE"), col("PRICE"), col("CHANGE_PRICE")).as("PRICE")
)

Didn’t we say that numeric columns get a different treatment than other types of columns? Well yeah, and if you don’t want end up checking types inside your UDAF, you can create another UDAF to handle numeric columns:

val replay = new EventReplay 
val num_replay = new NumericReplay 

val eventsDF:DataFrame = loadEvents() 
eventsDF.groupBy("OBJECT_ID").agg(
  replay(col("OPERATION_TYPE"), col("SEQUENCE"), col("NAME"), col("CHANGE_NAME")).as("NAME"),
  num_replay(col("OPERATION_TYPE"), col("SEQUENCE"), col("PRICE"), col("CHANGE_PRICE")).as("PRICE")
)

This will return a DataFrame with a shape like this:

OBJECT_ID

NAME

PRICE

1 Oreo 1.1
1 Ice Cream Sandwich 3.0

We still are missing a column, which is CREATION_DATE. This columns is just a read-only column, so it doesn’t need to bother with all the replay logic, so just picking any value of the subset is ok to us. We create a new UDAF to do this:

val replay = new EventReplay 
val num_replay = new NumericReplay 
val pick_any = new PickAny

val eventsDF:DataFrame = loadEvents() 
eventsDF.groupBy("OBJECT_ID").agg(
  replay(col("OPERATION_TYPE"), col("SEQUENCE"), col("NAME"), col("CHANGE_NAME")).as("NAME"),
  num_replay(col("OPERATION_TYPE"), col("SEQUENCE"), col("PRICE"), col("CHANGE_PRICE")).as("PRICE")
  pick_any(col("CREATE_DATE")).as("CREATE_DATE") 
)

Voila! This piece of code will return exactly what we are looking for:

OBJECT_ID

NAME

PRICE

CREATE_DATE

1 Oreo 1.1 2018/12/04
1 Ice Cream Sandwich 3.0 2018/12/05

2 Replies to “Event Replay with Spark”

  1. Hi, I don’t understand certain statements in the post:

    First, “In spark, is common to load in a DataFrame everything that is stored in a hard drive, and is very important to maintain it in that way, otherwise there is no joke on using spark at all.”
    The very foundations of Apache Spark are based on computing based on memory against the old Hadoop style that used mostly hard drive.

    Secondly, you recommend to parallelize the DF and then invoke the groupBy function, that is in fact a primary cause for shuffling. Have you tested the impact of groupBy without finding out the state of how your data are being sharing across the partitions? What is the value you use for the property “spark.sql.shuffle.partitions”?

    I’m afraid the performance of your code won’t be extraordinary because shuffling. Could you provide metrics?

    Thanks for the post

    1. HI John,

      Thanks for you reply.

      Regarding of first point. What I wanted to express with that statement is that there is a lot of technologies in which you can do memory based computation, nonetheless, there are several cases in which you cannot fit everything in memory. In my opinion, is very easy to handle this cases with the Dataframe/Dataset API, while you can still do memory based computation.

      About your second point. You are right, the groupBy clause will cause shuffling. Probably, adding a repartition clause before the groupBy could improve the query performance. Nonetheless, this will always depends on the data that you have.

      What I found, is that when you are dealing small amounts of data, usually is better to set spark.sql.shuffle.partitions as low value, something between 1 and 10. And bigger partition value with bigger amount of the data the repartition could perform better, again, this will depends on amount data that you have, and definitely, this is something to tune up.

      This approach maybe doesn’t have the best response times, but it can handle big amounts of data that cannot be fitted in the memory of the cluster. I don’t have the metrics for every specific case, but I will make a post in the future in which I will provide metrics for different approaches.

Leave a Reply

Your e-mail address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.