Records streamen naar externe services met DLT-sinks
Belangrijk
De DLT sink
-API bevindt zich in openbare voorvertoning.
In dit artikel wordt de DLT sink
-API beschreven en hoe u deze kunt gebruiken met DLT-stromen om records te schrijven die zijn getransformeerd door een pijplijn naar een externe gegevenssink, zoals beheerde en externe tabellen van Unity Catalog, Hive-metastoretabellen en streamingservices voor gebeurtenissen, zoals Apache Kafka of Azure Event Hubs.
Wat zijn DLT-sinks?
Met DLT-sinks kunt u getransformeerde gegevens schrijven naar doelen zoals gebeurtenisstreamingservices zoals Apache Kafka of Azure Event Hubs, en externe tabellen die worden beheerd door Unity Catalog of de Hive-metastore. Voorheen konden de streamingtabellen en gerealiseerde weergaven die in een DLT-pijplijn zijn gemaakt, alleen worden bewaard in door Azure Databricks beheerde Delta-tabellen. Als u sinks gebruikt, hebt u nu meer opties voor het behouden van de uitvoer van uw DLT-pijplijnen.
Wanneer moet ik DLT-sinks gebruiken?
Databricks raadt het gebruik van DLT-sinks aan als u het volgende moet doen:
- Bouw een operationele use-case op, zoals fraudedetectie, realtime analyses en aanbevelingen van klanten. Operationele use cases lezen doorgaans gegevens uit een berichtenbus, zoals een Apache Kafka-onderwerp, en verwerken vervolgens gegevens met een lage latentie en schrijven de verwerkte records terug naar een berichtenbus. Met deze benadering kunt u een lagere latentie bereiken door niet te schrijven of te lezen vanuit cloudopslag.
- Schrijf getransformeerde gegevens van uw DLT-stromen naar tabellen die worden beheerd door een extern Delta-exemplaar, waaronder beheerde en externe tabellen van Unity Catalog en Hive-metastoretabellen.
- Voer het reverse extract-transform-load (ETL)-proces uit naar doelen buiten Databricks, zoals Apache Kafka-topics. Met deze aanpak kunt u effectief gebruiksvoorbeelden ondersteunen waarbij gegevens moeten worden gelezen of gebruikt buiten Unity Catalog-tabellen of andere door Databricks beheerde opslag.
Hoe gebruik ik DLT-sinks?
Notitie
- Alleen streamingquery's die gebruikmaken van
spark.readStream
endlt.read_stream
worden ondersteund. Batchquery's worden niet ondersteund. - Alleen
append_flow
kan worden gebruikt om naar sinks te schrijven. Andere stromen, zoalsapply_changes
, worden niet ondersteund. - Het uitvoeren van een volledige vernieuwingsupdate ruimt niet automatisch eerder berekende gegevens op in de sinks. Dit betekent dat alle opnieuw verwerkte gegevens worden toegevoegd aan de sink en dat bestaande gegevens niet worden gewijzigd.
Wanneer gebeurtenisgegevens worden opgenomen uit een streamingbron in uw DLT-pijplijn, verwerkt en verfijnt u deze gegevens met behulp van DLT-functionaliteit en gebruikt u vervolgens de verwerking van toevoegstromen om de getransformeerde gegevensrecords naar een DLT-sink te streamen. U maakt deze sink met behulp van de functie create_sink()
. Zie de sink API-referentievoor meer informatie over het gebruik van de create_sink
functie.
Gebruik de volgende stappen om een DLT-sink te implementeren:
- Stel een DLT-pijplijn in om de streaming-gebeurtenisgegevens te verwerken en gegevensrecords voor te bereiden voor het schrijven naar een DLT-sink.
- Configureer en maak de DLT-sink om de gewenste doelsinkindeling te gebruiken.
- Gebruik een toevoegstroom om de voorbereide records naar de sink te schrijven.
Deze stappen worden behandeld in de rest van het onderwerp.
een DLT-pijplijn instellen om records voor te bereiden voor schrijven naar een sink
De eerste stap is het instellen van een DLT-pijplijn om de onbewerkte gebeurtenisstroomgegevens te transformeren in de voorbereide gegevens die u naar uw sink schrijft.
Als u dit proces beter wilt begrijpen, volgt u dit voorbeeld van een DLT-pijplijn die clickstream-gebeurtenisgegevens verwerkt uit de wikipedia-datasets
voorbeeldgegevens in Databricks. Deze pijplijn parseert de onbewerkte gegevensset om Wikipedia-pagina's te identificeren die zijn gerelateerd aan een Apache Spark-documentatiepagina en verfijnt die gegevens geleidelijk tot slechts de tabelrijen waarin de verwijzende koppeling Apache_Spark.
bevat.
In dit voorbeeld is de DLT-pijplijn gestructureerd met behulp van de medaillon-architectuur, die gegevens in verschillende lagen ordent om de kwaliteit en efficiƫntie van verwerking te verbeteren.
Begin met het laden van de onbewerkte JSON-records uit de gegevensset in uw bronslaag met behulp van Auto Loader. Deze Python-code laat zien hoe u een streamingtabel maakt met de naam clickstream_raw
, die de onbewerkte, niet-verwerkte gegevens uit de bron bevat:
import dlt
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"
@dlt.table(
comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
)
Nadat deze code is uitgevoerd, bevinden de gegevens zich nu op het niveau 'brons' (of 'onbewerkte gegevens') van deMedalarchitectuur en moeten ze worden opgeschoond. De volgende stap verfijnt gegevens tot het 'zilver'-niveau, waarbij gegevenstypen en kolomnamen worden opgeschoond en DLT-verwachtingen worden gebruikt om de gegevensintegriteit te waarborgen.
De volgende code laat zien hoe u dit doet door de bronslaaggegevens op te schonen en te valideren in de clickstream_clean
zilveren tabel:
@dlt.table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
spark.readStream.table("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
Als u de 'gouden' laag van uw pijplijnstructuur wilt ontwikkelen, filtert u de opgeschoonde clickstream-gegevens om vermeldingen te isoleren waarin de verwijzende pagina is Apache_Spark
. In dit laatste codevoorbeeld selecteert u alleen de kolommen die nodig zijn voor het schrijven naar de doelsinktabel.
De volgende code illustreert hoe u een tabel maakt met de naam spark_referrers
die de gouden laag vertegenwoordigt:
@dlt.table(
comment="A table of the most common pages that link to the Apache Spark page.",
table_properties={
"quality": "gold"
}
)
def spark_referrers():
return (
spark.readStream.table("clickstream_clean")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.select("referrer", "current_page_id", "current_page_title", "click_count")
)
Nadat dit gegevensvoorbereidingsproces is voltooid, moet u de doelsinks configureren waarin de opgeschoonde records worden geschreven.
een DLT-sink configureren
Databricks ondersteunt drie typen doelsinks waarin u uw records schrijft die zijn verwerkt op basis van uw streamgegevens:
- Delta-gootstenen voor tafels
- Apache Kafka-sinks
- Azure Event Hubs-sinks
Hieronder ziet u voorbeelden van configuraties voor delta-, kafka- en azure event hubs-sinks:
Delta sinks
Een Delta-sink maken via een bestandspad:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
Een Delta-sink maken op tabelnaam met behulp van een volledig gekwalificeerde catalogus en schemapad:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "my_catalog.my_schema.my_table" }
)
Kafka- en Azure Event Hubs-sinks
Deze code werkt voor zowel Apache Kafka als Azure Event Hubs-sinks.
topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{connection_string}";'
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
"topic": topic_name
}
)
Nu uw sink is geconfigureerd en uw DLT-pijplijn is voorbereid, kunt u beginnen met het streamen van verwerkte records naar de sink.
Schrijven naar een DLT-sink met een toevoegstroom
Wanneer uw sink is geconfigureerd, bestaat de volgende stap uit het schrijven van verwerkte records door deze op te geven als het doel voor records die worden uitgevoerd door een toevoegstroom. U doet dit door uw sink op te geven als de target
waarde in de append_flow
decorator.
- Voor beheerde en externe tabellen van Unity Catalog gebruikt u de indeling
delta
en geeft u het pad of de tabelnaam op in opties. Uw DLT-pijplijnen moeten zijn geconfigureerd voor het gebruik van Unity Catalog. - Voor Apache Kafka-onderwerpen gebruikt u de indeling
kafka
en geeft u de onderwerpnaam, verbindingsinformatie en verificatiegegevens op in de opties. Dit zijn dezelfde opties die een Spark Structured Streaming Kafka-sink ondersteunt. Zie De Kafka Structured Streaming Writer configureren. - Voor Azure Event Hubs gebruikt u de indeling
kafka
en geeft u de naam, verbindingsgegevens en verificatiegegevens van Event Hubs op in de opties. Dit zijn dezelfde opties die worden ondersteund in een Spark Structured Streaming Event Hubs-sink die gebruikmaakt van de Kafka-interface. Zie Service Principal-verificatie met Microsoft Entra ID en Azure Event Hubs. - Voor Hive-metastore-tabellen gebruikt u de indeling
delta
en geeft u het pad of de tabelnaam op in opties. Uw DLT-pijplijnen moeten zijn geconfigureerd voor het gebruik van de Hive-metastore.
Hieronder ziet u voorbeelden van het instellen van stromen voor het schrijven naar Delta-, Kafka- en Azure Event Hubs-sinks met records die zijn verwerkt door uw DLT-pijplijn.
Delta-sink
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
Kafka- en Azure Event Hubs-sinks
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
De parameter value
is verplicht voor een Azure Event Hubs-sink. Aanvullende parameters, zoals key
, partition
, headers
en topic
zijn optioneel.
Voor meer informatie over de append_flow
decorator, zie Toevoegstroom gebruiken om vanuit meerdere bronstromen naar een streamingtabel te schrijven.
beperkingen voor
Alleen de Python-API wordt ondersteund. SQL wordt niet ondersteund.
Alleen streamingquery's die gebruikmaken van
spark.readStream
endlt.read_stream
worden ondersteund. Batchquery's worden niet ondersteund.Alleen
append_flow
kunnen worden gebruikt om naar sinks te schrijven. Andere stromen, zoalsapply_changes
, worden niet ondersteund en u kunt geen sink gebruiken in een DLT-gegevenssetdefinitie. Het volgende wordt bijvoorbeeld niet ondersteund:@table("from_sink_table") def fromSink(): return read_stream("my_sink")
Voor Delta-sinks moet de tabelnaam volledig gespecificeerd zijn. Met name voor door Unity Catalog beheerde externe tabellen moet de tabelnaam van het formulier
<catalog>.<schema>.<table>
zijn. Voor de Hive-metastore moet deze in de vorm<schema>.<table>
zijn.Wanneer
FullRefresh
wordt uitgevoerd, worden eerder berekende resultaten niet opgeschoond in de sinks. Dit betekent dat alle opnieuw verwerkte gegevens worden toegevoegd aan de sink en dat bestaande gegevens niet worden gewijzigd.DLT-verwachtingen worden niet ondersteund.