Stream records to external services with Delta Live Tables sinks
Important
The Delta Live Tables sink
API is in Public Preview.
This article describes the Delta Live Tables sink
API and how to use it with DLT flows to write records transformed by a pipeline to an external data sink such as Unity Catalog managed and external tables, Hive metastore tables, and event streaming services such as Apache Kafka or Azure Event Hubs.
What are Delta Live Tables sinks?
Delta Live Tables sinks enable you to write transformed data to targets such as event streaming services like Apache Kafka or Azure Event Hubs, and external tables managed by Unity Catalog or the Hive metastore. Previously, the streaming tables and materialized views created in a Delta Live Tables pipeline could be persisted only to Azure Databricks managed Delta tables. Using sinks, you now have more options for persisting the output of your Delta Live Tables pipelines.
When should I use Delta Live Tables sinks?
Databricks recommends using Delta Live Tables sinks if you need to:
- Build out an operational use case like fraud detection, real-time analytics, and customer recommendations. Operational use cases typically read data from a message bus, such as an Apache Kafka topic, and then process data with low latency and write the processed records back to a message bus. This approach enables you to achieve lower latency by not writing or reading from cloud storage.
- Write transformed data from your Delta Live Tables flows to tables managed by an external Delta instance, including Unity Catalog managed and external tables and Hive metastore tables.
- Perform reverse extract-transform-load (ETL) into sinks external to Databricks, such as Apache Kafka topics. This approach enables you to effectively support use cases where data needs to be read or used outside of Unity Catalog tables or other Databricks-managed storage.
How do I use Delta Live Tables sinks?
Note
- Only streaming queries using
spark.readStream
anddlt.read_stream
are supported. Batch queries are not supported. - Only
append_flow
can be used to write to sinks. Other flows, such asapply_changes
, are not supported. - Running a full refresh update does not clean up previously computed results data in the sinks. This means that any reprocessed data will be appended to the sink, and existing data will not be altered.
As event data is ingested from a streaming source into your Delta Live Tables pipeline, you process and refine this data using Delta Live Tables functionality and then use append flow processing to stream the transformed data records to a Delta Live Tables sink. You create this sink using the create_sink()
function. For more details on using the create_sink
function, see the sink API reference.
To implement a Delta Live Tables sink, use the following steps:
- Set up a Delta Live Tables pipeline to process the streaming event data and prepare data records for writing to a Delta Live Tables sink.
- Configure and create the Delta Live Tables sink to use the preferred target sink format.
- Use an append flow to write the prepared records to the sink.
These steps are covered in the rest of the topic.
Set up a Delta Live Tables pipeline to prepare records for writing to a sink
The first step is to set up a Delta Live Tables pipeline to transform the raw event stream data into the prepared data you will write to your sink.
To better understand this process, you can follow this example of a Delta Live Tables pipeline that processes clickstream event data from the wikipedia-datasets
sample data in Databricks. This pipeline parses the raw dataset to identify Wikipedia pages that link to an Apache Spark documentation page and progressively refines that data to just the table rows where the referring link contains Apache_Spark.
In this example, the Delta Live Tables pipeline is structured using the medallion architecture, which organizes data into different layers to enhance quality and processing efficiency.
To start, load the raw JSON records from the dataset into your bronze layer using Auto Loader. This Python code demonstrates how to create a streaming table named clickstream_raw
, which contains the raw, unprocessed data from the source:
import dlt
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"
@dlt.table(
comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
)
After this code runs, the data is now at the “bronze” (or “raw data”) level of the Medallion architecture and must be cleaned up. The next step refines data to the “silver” level, which involves cleaning up data types and column names and using Delta Live Tables expectations to ensure data integrity.
The following code demonstrates how to do this by cleaning and validating the bronze layer data into the clickstream_clean
silver table:
@dlt.table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
spark.readStream.table("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
To develop the “gold” layer of your pipeline structure, you filter the cleaned clickstream data to isolate entries where the referring page is Apache_Spark
. In this last code example, you select only the columns necessary for writing to your target sink table.
The following code illustrates how to create a table called spark_referrers
representing the gold layer:
@dlt.table(
comment="A table of the most common pages that link to the Apache Spark page.",
table_properties={
"quality": "gold"
}
)
def spark_referrers():
return (
spark.readStream.table("clickstream_clean")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.select("referrer", "current_page_id", "current_page_title", "click_count")
)
After this data preparation process is completed, you must configure the destination sinks into which the cleaned records will be written.
Configure a Delta Live Tables sink
Databricks supports three types of destination sinks into which you write your records processed from your stream data:
- Delta table sinks
- Apache Kafka sinks
- Azure Event Hubs sinks
Below are examples of configurations for Delta, Kafka, and Azure Event Hubs sinks:
Delta sinks
To create a Delta sink by file path:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
To create a Delta sink by table name using a fully qualified catalog and schema path:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "my_catalog.my_schema.my_table" }
)
Kafka and Azure Event Hubs sinks
This code works for both Apache Kafka and Azure Event Hubs sinks.
topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{connection_string}";'
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
"topic": topic_name
}
)
Now that your sink is configured and your Delta Live Tables pipeline is prepared, you can begin streaming processed records to the sink.
Write to a Delta Live Tables sink with an append flow
With your sink configured, the next step is to write processed records to it by specifying it as the target for records output by an append flow. You do this by specifying your sink as the target
value in the append_flow
decorator.
- For Unity Catalog managed and external tables, use the format
delta
and specify the path or table name in options. Your Delta Live Tables pipelines must be configured to use Unity Catalog. - For Apache Kafka topics, use the format
kafka
and specify the topic name, connection information, and authentication information in the options. These are the same options a Spark Structured Streaming Kafka sink supports. See Configure the Kafka Structured Streaming writer. - For Azure Event Hubs, use the format
kafka
and specify the Event Hubs name, connection information, and authentication information in the options. These are the same options supported in a Spark Structured Streaming Event Hubs sink that uses the Kafka interface. See Service Principal authentication with Microsoft Entra ID and Azure Event Hubs. - For Hive metastore tables, use the format
delta
and specify the path or table name in options. Your Delta Live Tables pipelines must be configured to use the Hive metastore.
Below are examples of how to set up flows to write to Delta, Kafka, and Azure Event Hubs sinks with records processed by your Delta Live Tables pipeline.
Delta sink
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
Kafka and Azure Event Hubs sinks
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
The value
parameter is mandatory for an Azure Event Hubs sink. Additional parameters such as key
, partition
, headers
, and topic
are optional.
For more details on the append_flow
decorator, see Use append flow to write to a streaming table from multiple source streams.
Limitations
Only the Python API is supported. SQL is not supported.
Only streaming queries using
spark.readStream
anddlt.read_stream
are supported. Batch queries are not supported.Only
append_flow
can be used to write to sinks. Other flows, such asapply_changes
, are not supported, and you cannot use a sink in a Delta Live Tables dataset definition. For example, the following is not supported:@table("from_sink_table") def fromSink(): return read_stream("my_sink")
For Delta sinks, the table name must be fully qualified. Specifically, for Unity Catalog managed external tables, the table name must be of the form
<catalog>.<schema>.<table>
. For the Hive metastore, it must be in the form<schema>.<table>
.Running
FullRefresh
will not clean up previously computed results data in the sinks. This means that any reprocessed data will be appended to the sink, and existing data will not be altered.Delta Live Tables expectations are not supported.