Del via


Structured Streaming patterns on Azure Databricks

This contains notebooks and code samples for common patterns for working with Structured Streaming on Azure Databricks.

Getting started with Structured Streaming

If you are brand new to Structured Streaming, see Run your first Structured Streaming workload.

Write to Cassandra as a sink for Structured Streaming in Python

Apache Cassandra is a distributed, low-latency, scalable, highly-available OLTP database.

Structured Streaming works with Cassandra through the Spark Cassandra Connector. This connector supports both RDD and DataFrame APIs, and it has native support for writing streaming data. Important You must use the corresponding version of the spark-cassandra-connector-assembly.

The following example connects to one or more hosts in a Cassandra database cluster. It also specifies connection configurations such as the checkpoint location and the specific keyspace and table names:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Write to Azure Synapse Analytics using foreachBatch() in Python

streamingDF.writeStream.foreachBatch() allows you to reuse existing batch data writers to write the output of a streaming query to Azure Synapse Analytics. See the foreachBatch documentation for details.

To run this example, you need the Azure Synapse Analytics connector. For details on the Azure Synapse Analytics connector, see Query data in Azure Synapse Analytics.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Stream-Stream joins

These two notebooks show how to use stream-stream joins in Python and Scala.

Stream-Stream joins Python notebook

Get notebook

Stream-Stream joins Scala notebook

Get notebook