แชร์ผ่าน


Run your first Structured Streaming workload

This article provides code examples and explanation of basic concepts necessary to run your first Structured Streaming queries on Azure Databricks. You can use Structured Streaming for near real-time and incremental processing workloads.

Structured Streaming is one of several technologies that power streaming tables in Delta Live Tables. Databricks recommends using Delta Live Tables for all new ETL, ingestion, and Structured Streaming workloads. See What is Delta Live Tables?.

Note

While Delta Live Tables provides a slightly modified syntax for declaring streaming tables, the general syntax for configuring streaming reads and transformations applies to all streaming use cases on Azure Databricks. Delta Live Tables also simplifies streaming by managing state information, metadata, and numerous configurations.

Use Auto Loader to read streaming data from object storage

The following example demonstrates loading JSON data with Auto Loader, which uses cloudFiles to denote format and options. The schemaLocation option enables schema inference and evolution. Paste the following code in a Databricks notebook cell and run the cell to create a streaming DataFrame named raw_df:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Like other read operations on Azure Databricks, configuring a streaming read does not actually load data. You must trigger an action on the data before the stream begins.

Note

Calling display() on a streaming DataFrame starts a streaming job. For most Structured Streaming use cases, the action that triggers a stream should be writing data to a sink. See Production considerations for Structured Streaming.

Perform a streaming transformation

Structured Streaming supports most transformations that are available in Azure Databricks and Spark SQL. You can even load MLflow models as UDFs and make streaming predictions as a transformation.

The following code example completes a simple transformation to enrich the ingested JSON data with additional information using Spark SQL functions:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

The resulting transformed_df contains query instructions to load and transform each record as it arrives in the data source.

Note

Structured Streaming treats data sources as unbounded or infinite datasets. As such, some transformations are not supported in Structured Streaming workloads because they would require sorting an infinite number of items.

Most aggregations and many joins require managing state information with watermarks, windows, and output mode. See Apply watermarks to control data processing thresholds.

Perform an incremental batch write to Delta Lake

The following example writes to Delta Lake using a specified file path and checkpoint.

Important

Always make sure you specify a unique checkpoint location for each streaming writer you configure. The checkpoint provides the unique identity for your stream, tracking all records processed and state information associated with your streaming query.

The availableNow setting for the trigger instructs Structured Streaming to process all previously unprocessed records from the source dataset and then shut down, so you can safely execute the following code without worrying about leaving a stream running:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

In this example, no new records arrive in our data source, so repeat execution of this code does not ingest new records.

Warning

Structured Streaming execution can prevent auto termination from shutting down compute resources. To avoid unexpected costs, be sure to terminate streaming queries.

Read data from Delta Lake, transform, and write to Delta Lake

Delta Lake has extensive support for working with Structured Streaming as both a source and a sink. See Delta table streaming reads and writes.

The following example shows example syntax to incrementally load all new records from a Delta table, join them with a snapshot of another Delta table, and write them to a Delta table:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

You must have proper permissions configured to read source tables and write to target tables and the specified checkpoint location. Fill in all parameters denoted with angle brackets (<>) using the relevant values for your data sources and sinks.

Note

Delta Live Tables provides a fully declarative syntax for creating Delta Lake pipelines and manages properties like triggers and checkpoints automatically. See What is Delta Live Tables?.

Read data from Kafka, transform, and write to Kafka

Apache Kafka and other messaging buses provide some of the lowest latency available for large datasets. You can use Azure Databricks to apply transformations to data ingested from Kafka and then write data back to Kafka.

Note

Writing data to cloud object storage adds additional latency overhead. If you wish to store data from a messaging bus in Delta Lake but require the lowest latency possible for streaming workloads, Databricks recommends configuring separate streaming jobs to ingest data to the lakehouse and apply near real-time transformations for downstream messaging bus sinks.

The following code example demonstrates a simple pattern to enrich data from Kafka by joining it with data in a Delta table and then writing back to Kafka:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

You must have proper permissions configured for access to your Kafka service. Fill in all parameters denoted with angle brackets (<>) using the relevant values for your data sources and sinks. See Stream processing with Apache Kafka and Azure Databricks.