Computing Platform (3): Fast Lane Operations with Spark Applications

..or how to work with Spark Applications

Abstract

This article contains a description of procedures for best practices and Continuous Delivery processes with Spark Applications. Since there is basically three types of distribution this document includes recommendations about the preferred one and the related procedures for development, testing, staging and distribution.

The technical scope covers:

Regarding Source Code Management we will follow the FTS Source Code Management  Workflow Document guidelines.

We’ll cover as well the Continuous Processing mode for real time operations and an analysis of the Databricks workspace.

Spark Application Main Features

  • It is responsible for launching various parallel operations on a cluster.
  • It’s the shape of a JAR file (written in Scala or Java), a Notebook or a set of chained notebooks (written in Java, Scala, R, Python) with the ipynb extension.
  • They contain the business logic and rules, handling data to perform complex operations of transformation, load, extraction, etc.
  • It is the process which is running the user code which in turn create the SparkContext object, create RDDs and performs transformation and action operation on RDD.
  • It accesses Apache Spark through a SparkContext object which represents a connection to computing cluster (From Spark 2.0 and upper we can access SparkContext object through SparkSession).
  • It is responsible for converting user program into the unit of physical execution called task.
  • It also defines distributed datasets on the cluster and we can apply different operations on Dataset (transformation and action).
  • Spark program creates a logical plan called Directed Acyclic Graph (DAG) which is converted to physical execution plan by the driver when driver program runs.

Continuous Delivery

We outline the common integration points for a data pipeline in the CD cycle and how you can leverage functionalities in Databricks to integrate with your systems. Our own internal data pipelines follow the approach outlined in this blog to continuously deliver audit logs to our customers.

Just as a reminder about the Continuous Delivery Stages set up in the Delivery Platform pipelines:

  • Continuous Integration stage is the snapshot state. Provides Unit/Integration/Automatic Tests. The Driver App can be distributed through a pipeline. The Driver App is subject to Code Review in the Git server with required approval.
  • Beta stage is already the stable version, merged from the development branch. Automatic tests are provided to be used by the CD pipeline.
  • Candidate stage is subject to stress and security tests.
  • Production stage is achieved after all tests. Deployment is automatic and not triggered by humans.

Following are the key phases and challenges in following the best practices of CI/CD for a driver application. Note all the stages (Beta, Candidate and Production) have a specific workspace:

  • Iterative development with tests: As developers are building data operations by exploring data in notebooks and moving towards maturity, code can get quickly unwieldy and writing tests. They use the personal workspace for testing.
  • Continuous Delivery. Start with the Beta Stage: As new code is getting merged to the stable branch, the build pipeline must be able to pull the latest changes and run the automatic tests for various components and publish the latest artifacts or place the notebooks in the Beta workspace.
  • Pushing driver application to the Candidate stage: Once all the tests have passed, the build server must be able to push the driver application to the Candidate stage to test the code on a much larger data set that resembles production data for performance and data quality.
  • Pushing data pipeline to the Production stage: The final phase is pushing the driver application in staging into production so that the next run of the driver app picks the latest code and is available in production.

Spark Application Project Structure

Spark applications can be arranged in several ways.

Basically, you can package your app into a JAR file. In the case you are maintaining a regular Java/Scala project based on some build system (e.g. Maven, SBT or Gradle). You use your favorite IDE to manage this project and you keep the standard project layout in order to maintain your test classes and dependencies.

In this case when the JAR file is created it should be a fat jar (self contained JVM executable package) containing all the needed dependencies.

The Databricks job can be assigned to this JAR file directly or you can define a notebook with a basic entry point for some needed handling of parameters, for instance (See the Q&A Section to see more cases where a notebook choice can be more recommendable).

The other option is to write the whole Spark Application in a single notebook. In this case you need to handle a ipynb file, editable in your favorite IDE or text editor but it is recommendable you use your personal workspace in Databricks.

If the Spark Application’s business logic is complex enough we have the option to make a workflow. A Databricks Workflow is the recommended way to compose Spark pipelines by chaining notebooks.

Below diagrams shows the several ways to compose the Spark Applications. Please follow these rules and structures to arrange your code.

Jobs & Workflows

 

Version Control

The Source Code Management (SCM) Workflow for Spark Applications changes accordingly to smaller pieces of code that are released eventually many times a day, avoiding development integration branches (software components are smaller, more autonomous and this kind of integration based on dependencies is not applicable any more) and working only with feature and master branches. Hot fixes are still applicable as usual and described in the referenced document about the standard SCM workflow.

In a common regular scenario with several developers working at the same time on certain components. The recommended approach is to set up a development environment (Azure Databricks Workspace) per user. Each user pushes the notebook to their own personal folders in the Databricks Workspace by using the Azure Databricks UI or preferably by using the Databricks CLI. They work on their own copies of the source code in the Databricks Workspace. Then, they can export the files via Azure Databricks UI/API/CLI to their local development environment and then check-in to the feature branch before making a merge request against the stable branch. They also have their own small clusters for the development.

Development Area

The development environment in Databricks is based on Azure and consists of four main zones:

1-Notebooks

 

2-Libraries (DBFS)

 

3-Jobs

 

4-Clusters

The recommended approach is to set up a development environment (Databricks Workspace) per user. Each user pushes the notebook to their own personal folders in the interactive workspace. They work on their own copies of the source code in the interactive workspace. They then export it via API/CLI to their local development environment and then check-in to their own branch before making a pull request against the master branch. They also have their own small clusters for the development.

 

The Databricks CLI

The Databricks CLI will be used for inter-acting with the Databricks workspace. You can find here a full explanation.

For instance:

Importing files to local environment from Databricks workspace:

databricks workspace import_dir sparkapp1/notebooks /home/user/dev1@fexco.com/workspace/projects/sparkapp1/notebooks

databricks workspace import_dir sparkapp1/config /home/user/dev1@fexco.com/workspace/projects/sparkapp1/config

Copying a library as a dependency to Databricks workspace:

databricks fs cp sparkapp1-2.1.jar dbfs:/dev1/sparkapp1/sparkapp1-2.1.jar

Exporting local files to workspace:

databricks workspace export_dir /home/user/dev1@fexco.com/workspace/projects/sparkapp1/notebooks sparkapp1/notebooks

 

Testing & Benchmarking

There are two phases as usual..

1-Pre-Deployment Test & Benchmark Phase

It runs the unit and integration test written for the Spark Application.

For JAR files these tests are part of the build life cycle (Maven, SBT, Gradle).

For notebooks and workflows the project will support the unit and integration tests of the code snippets contained in the notebooks.

Additionally, the developer must generate the benchmarks for the operational performance of the Spark Application running in the personal workspace (not in the local machine). We’ll use the benchmark generation tool to generate and store the benchmarks in relation to the specific resource in the API.

2-Post-Deployment Test & Benchmark Phase

Once the Spark Application is deployed and assigned to the specific job the pipeline will run the post deployment tests

We’ll see the place for the testing and bench marking phases in the next section.

 

Life-Cycle

The Software Development Life Cycle for Spark Applications in the API is quite simple as it is based on elemental well-known steps:

1-Developers work with a personal Databricks workspace and local file system. Once they have written the code for the logic and tests they generate the first set of benchmarks for the feature or fix. These benchmarks are related to the metrics obtained from performance of database queries in storage (please read about the Bench marking Tool).

2-Once the developer is happy with the benchmark the code will be pushed to the SCM by following the standard SCM flow. After code review the code is eventually merged to the stable version (master) and it will be ready to build used by the build pipeline.

3-The build phase will run the pre-deployment tests (Unit and Integration) no matters if the code is making a JAR file, it is a notebook or a workflow.

4-For notebooks and flows the build pipeline will not use artifact storage (Nexus) and the code will be retrieved for the SCM if needed from the given tag (so, the build pipeline tags the code with test passed notebooks). Nexus will be used only in case of creation of JAR files.

5-By using the Databricks CLI the build pipeline assigns the notebook/workflow/JAR file to a specific job.

6-Then, the post deployment phase takes place. A second generation of benchmarks will be generated and stored. The build pipeline will check out the benchmarks are under the maximum tolerable threshold to success.

Communication Flows and Packaging Modes in Spark Applications

According to asynchronous communications, Event Sourcing and Integration Patterns two main ways of communication have been defined:

  1. Job operations through queues.
  2. Events sent through streams.

So, Spark applications are used by several ways:

Slow Lane Spark Applications (Queues and Databricks Jobs)

First, the Spark Applications attached to Databricks jobs are accessible via the Databricks REST API and the Databricks CLI. This is the case for job execution requests via queues.

Secondly, the notebooks not attached to jobs are listening streams in order to receive any input (the event) and trigger a specific business logic/operation accordingly to the received parameters.

To create a Spark Application in the Slow Lane and therefore attached to a Job you can follow any of the three approaches to create a Spark App: Notebook, JAR file, hybrid.

You can find examples and references to develop, test, build and package a Spark application.

You can find references and examples about  the JAR file pattern and layout at our internal Git repository (ask us for a permission to access if your are interested) at https://gitlab-citp.fexcosoftware.com/cp/Spark-DriverApp-Reference.

Fast Lane Spark Applications (Streams and notebooks/hybrid)

The notebooks and involved logic in Fast Lane operations are basically stream listeners, observing incoming events and performing low latency operations in a periodical basis. We’ll comment this kind of processing in the next sections

 

Continuous Processing

The pattern to follow by the Fast Lane (streams listeners) Spark Applications is called Continuous Processing. It can provide end-to-end latency times as low as 1 millisecond with at-least-once guarantees. Without changing the Dataset/DataFrame operations in queries, we will be able to choose the mode based on the API operation requirements.

It is really important to know more about the way Spark works in the Continuous Processing mode. In this mode, instead of launching periodic tasks, Spark launches a set of long-running tasks that continuously process (read, process and write) data, performing in fact the target operations. At a high level, the setup and the record-level timeline looks like these

Source: Databricks

The flow is always the same:

  1. Start the Spark Application
  2. Define the incoming stream configuration
  3. Set the settings including the continuous processing mode (red)
incomingStream
.filter("isValidated(transactionId)")
.writeStream
.outputMode("append")
.format("console")
.option("truncate", false)
.trigger(continuous = "5 seconds")
.start()

So, we have 3 options here

  • trigger(processingTime = “1 second”) is a micro-batch query where the batch interval (the time interval between batch starts) is 1 second.
  • trigger(processingTime = “0 seconds”) is a micro-batch query where the batch interval is 0; that is, Spark starts batches as soon as it can.
  • trigger(continuous = “5 seconds”) is a continuous query where the checkpoint interval (the time interval between Spark asking for checkpoints) is 5 seconds.

Continuous Processing and Azure Event Hubs

In the last section we mentioned the Continuous Processing mode as the preferred for real time operations (included in our Fast Lane Operations Set). We have several options to handle communications to the Fast Lane Ops (e.g. Kafka). As we work with Azure we have selected Azure Event Hubs streams as the solution to send tasks to the Fast Lane operations.

First, we need to import the Azure Event Hubs for Databricks library to our workspace.

Then, we need to pass to our notebook the settings for the connection to a specific Event Hub:

import java.util._
import com.microsoft.azure.eventhubs._
import spark.implicits._

val policyName = "myName"
val policykey = "myKey"
val eventHubNamespace = "name"
val eventHubName = "eventhubname"
val progressDir = "/databrickstest/data1/processingalpha"
val eventhubParameters = Map[String, String] (
"eventhubs.policyname" -> policyName,
"eventhubs.policykey" -> policykey,
"eventhubs.namespace" -> eventHubNamespace,
"eventhubs.name" -> eventHubName,
"eventhubs.partition.count" -> "1",
"eventhubs.consumergroup" -> "$Default",
"eventhubs.progressTrackingDir" -> "/eventhubs/progress",
"eventhubs.sql.containsProperties" -> "true",
"eventhubs.maxRate" -> s"3"
)

Create the incoming stream object:

val incomingStream = spark.readStream.format("eventhubs").options(eventhubParameters).load()

And add the settings to start listening streams through Event Hubs.

incomingStream
.filter("isValidated(transactionId)")
.writeStream
.outputMode("append")
.format("console")
.option("truncate", false)
.trigger(continuous = "5 seconds")
.start()

 

Q&A

When a notebook is more recommendable than a regular JAR file for Spark applications?

Well, it is a developer decision but we need to make some previous considerations. On one hand the JAR file offers us a well known way of including tests and a layout for our classes. We know how it works and we know how to do a good usage of JAR files.

Easy performance analysis: We can easily look at the performance of different stages by having the code broken down into different cells and looking at the time taken for each cell to execute.

Visual troubleshooting of intermediate stages: We can easily look at the intermediate results instead of searching through a lot of logs for your debug statements.

Return results from the job: We can programmatically get the exit status of the notebook and take corresponding action in the workflow externally.

When a hybrid approach (notebook + JAR file) is more recommendable than a regular JAR file or a notebook for Spark applications?

Again, it is a developer decision. As developers we could by default prefer putting all code into a library and directly run with Databricks Jobs in staging and production. While we can do that, there are some interesting advantages of using the hybrid approach of having core logic in libraries and using notebooks as a wrapper that stitches everything together

Parameterization is a very good example. Normally we’ll receive the parameters from the job requests. Anyway, we can very quickly change the input parameters in a notebook and run the core logic in libraries. Just for a configuration change, you don’t need to compile the JAR file and re-upload and run them again.

Simple chaining of jobs is another good reason. We can chain a simple linear workflow with fail fast mechanisms. The workflow could be chaining different code blocks from the same library or chaining different libraries as well, managed by notebooks.

Conclusion

Well, we have listed here the main topics about how to use Spark Applications in the API’s Computing Platform. By using our Cloud Provider  (Azure Databricks) we can use the best service and toolkit we could imagine to be fully operative and maximize our efficiency in Fast Lane Low-Latency operations.

Please find below some  links to important materials. We are looking forward to hear from your questions and suggestions.

Cheers!

 

References

Spark Application

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-driver.html

Spark Cluster

https://spark.apache.org/docs/latest/cluster-overview.html

What is Databricks

Unified Analytics Platform

Azure Databricks

https://databricks.com/product/azure

Spark Workflows

https://databricks.com/blog/2016/08/30/notebook-workflows-the-easiest-way-to-implement-apache-spark-pipelines.html

Databricks CLI

https://docs.azuredatabricks.net/user-guide/dev-tools/databricks-cli.html

Spark Notebooks Examples

https://databricks.com/resources/type/example-notebooks

Continuous Integration & Continuous Delivery with Databricks

https://databricks.com/blog/2017/10/30/continuous-integration-continuous-delivery-databricks.html

Continuous Integration and Delivery of Apache Spark Applications at Metacog

https://databricks.com/blog/2016/04/06/continuous-integration-and-delivery-of-apache-spark-applications-at-metacog.html

Continuous Integration for Spark Applications

https://youtu.be/NgQM_dppQxU

Structured Streaming Programming Guide

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

Continuous Processing

https://databricks.com/blog/2018/03/20/low-latency-continuous-processing-mode-in-structured-streaming-in-apache-spark-2-3-0.html

 

Jesus de Diego

Author: Jesus de Diego

Software Architect and Team Lead

One Reply to “Computing Platform (3): Fast Lane Operations with Spark Applications”

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.