Databricks: Merge Structured Stream Into SQL Server Table

Tibor Vekony
5 min readSep 15, 2023

--

Summary

While in the Databricks ecosystem it’s a lot easier to merge data from a Spark Structured Stream into a Delta Lake table & really easy to connect to and use, sometimes I’m still coming across projects, where the business users don’t want to abandon their Relational Database (such as SQL Server) just yet, so they want the data streamed to the database — or sometimes both to the DB and a new Delta Lake table.

When I encountered this requirement for the first time, I couldn’t find an end-to-end description / demo of how to do it, so here is an example implementation that can be used as a starting point!

Supported Outputs

In the Spark Structured Streaming Documentation (link points to the latest — 3.5.0 as of writing this article), there are 5 supported output sinks:

  • File (e.g.: writing the data to a parquet, orc, json, csv, etc. file)
  • Apache Kafka
  • Console (for debugging)
  • Memory (for debugging)
  • ForEach

Now, there isn’t a specific SQL Server — or even generic JDBC — output sink there, but there is the “ForEach” output sink. This Sink type can run arbitrary computation on the records in each microbatchs of the Structured Stream. Which means, we can do whatever we want, even run a MERGE SQL Statement on each microbatch, which as the name implies, merges the data into the SQL Server table.

By the way, the ForEach output type could be used to merge the data into an Oracle DB, send to a REST API, or really whatever is required.

Step 1: Setup the Connection to the Database

Change the code accordingly, replacing the placeholders with the name of the SQL Server, the Database & the credentials. The database connection will be established & usable via the db_connection variable.

Via this approach, on Databricks no additional drivers need to be installed on the cluster or during the session.

Step 2: Mappings / Configuration

Instead of repeating code, I prefer to create generic, parameterized functions. In this case, a generic function which merges a microbatch to a SQL Server table.

The configuration below is just an example that I’ve used. Feel free to change it: add new things that you might need or remove things that you don’t. For merging, we’ll surely need a few things, like the target table, primary keys & mappings.

Description of the Attributes:

  • isActive: Boolean. If True, the configuration will be processed, if False, it’ll be skipped.
  • sourceTable: Name of the source Delta Lake table. The contents of this table will be streamed & merged into the targetTable.
  • targetTable: Name of the target table in the SQL Server Database. The contents of the sourceTable will be merged into this table.
  • primaryKeys: List of the columns in the microbatch & the target table. Their names must be identical, so in the mapping make sure that the Delta Lake table’s primary keys columns are renamed to have the same names as the PKs of the targetTable.
  • mapping: List of Spark expressions to be applied when the Delta Lake table’s contents are read. Essentially, transformations can be defined here to ensure that the microbatches that are passed into the merging function are in the correct format (schema).

Step 3: Merging Function

Next, the function is defined, which will be executed for each microbatch of the Structured Stream. In this case, this is a function that merges the microbatch into the target table in the SQL Server.

The function has 3 main steps:

  1. Writes the microbatch’s contents to a newly created table on the SQL Server Database.
  2. Executes the MERGE statement.
  3. Drops the microbatch’s table.

Some characteristics of the function, that you might want to fine-tune or change based on your needs:

  • The microbatch tables are created with the epoch ID, which is a strictly monotonically increasing number, to ensure that each microbatch table’s name is unique.
  • The function expects a sequence column (by default named as “__START_AT”) which is used in the MERGE statement to filter out duplications from the microbatch. For example: if what we’re merging is a CDC feed and in the same microbatch there are 2 states for a given Customer, then only the latest state (based on the sequence column) will be merged.
  • The records in the target table are only updated, if the Epoch ID stored in the “UpdatedBy” column is lesser, than the microbatch’s Epoch ID that’s being merged. See the WHEN MATCHED condition!
  • In the mapping configuration, leave the “UpdatedBy” as a static “<epochId>” string, if you plan on using the above described WHEN MATCHED condition, because in the insert that static string value is replaced by the current microbatch’s Epoch ID.

Step 4: Starting the Structured Streams

Start a Structured Stream in parallel for each entry in the mapping variable.

At the very beginning, there is a test on the isActive attribute. If it’s False, then a stream won’t be started for that entry.

Next, a stream is started the entry — given that the isActive equals True — from the source Delta Lake table, selecting the data from it by executing the expressions defined in the mapping part of the configuration. This is the readStream() part.

Now that we have the data read as a stream, it can be written. In the writeStream() part we give it a user-friendly name, so it’s a lot easier to find in the Spark UI, if it needs to be monitored, define a checkpoint location so it becomes fault tolerant (make sure to update the placeholder value!) & execute the mergeToSqlServer() function we defined in Step 3.

I’ve set the processing time to 30 seconds, but feel free to change that to one that suits your requirements!

After the stream is initialized, the loop waits for 45 seconds to make sure it’s fully initialized before it tries to start another one.

Conlusion

This is a pretty simple straightforward use case, but the foreachBatch / ForEach output sink type can be used to execute any arbitrary logic that’s needed: call APIs, merge to multiple sources, etc.

--

--

Tibor Vekony
Tibor Vekony

Written by Tibor Vekony

Here to share & learn interesting practices, technologies & other stuff in the Cloud.

No responses yet