Použití služby Azure Event Hubs jako zdroje dat Delta Live Tables
Tento článek vysvětluje, jak pomocí tabulek Delta Live zpracovávat zprávy ze služby Azure Event Hubs. Konektor Služby Event Hubs strukturovaného streamování nemůžete použít, protože tato knihovna není k dispozici v rámci modulu Databricks Runtime a Delta Live Tables neumožňuje používat knihovny JVM třetích stran.
Jak se můžou delta živé tabulky připojit ke službě Azure Event Hubs?
Azure Event Hubs poskytuje koncový bod kompatibilní s Apache Kafka, který můžete použít s konektorem Kafka strukturovaného streamování, který je k dispozici v Databricks Runtime, ke zpracování zpráv ze služby Azure Event Hubs. Další informace o kompatibilitě služby Azure Event Hubs a Apache Kafka najdete v tématu Použití služby Azure Event Hubs z aplikací Apache Kafka.
Následující kroky popisují připojení kanálu Delta Live Tables k existující instanci služby Event Hubs a využívání událostí z tématu. K provedení těchto kroků potřebujete následující hodnoty připojení služby Event Hubs:
- Název oboru názvů služby Event Hubs.
- Název instance centra událostí v oboru názvů Event Hubs.
- Název zásady sdíleného přístupu a klíč zásad pro službu Event Hubs. Ve výchozím nastavení
RootManageSharedAccessKey
se pro každý obor názvů služby Event Hubs vytvoří zásada. Tato zásada mámanage
send
oprávnění alisten
oprávnění. Pokud váš kanál čte jenom ze služby Event Hubs, doporučuje Databricks vytvořit novou zásadu pouze s oprávněním pro naslouchání.
Další informace o službě Event Hubs připojovací řetězec najdete v tématu Získání služby Event Hubs připojovací řetězec.
Poznámka:
- Azure Event Hubs poskytuje možnosti OAuth 2.0 i sdíleného přístupového podpisu (SAS) pro autorizaci přístupu k vašim zabezpečeným prostředkům. Tyto pokyny používají ověřování založené na SAS.
- Pokud získáte službu Event Hubs připojovací řetězec z webu Azure Portal, nemusí obsahovat
EntityPath
hodnotu. TatoEntityPath
hodnota se vyžaduje pouze při použití konektoru Event Hubs strukturovaného streamování. Použití konektoru Kafka strukturovaného streamování vyžaduje zadání pouze názvu tématu.
Uložení klíče zásad do tajného kódu Azure Databricks
Protože klíč zásad je citlivé informace, Databricks doporučuje nezakódovat hodnotu v kódu kanálu. Místo toho použijte tajné kódy Azure Databricks k ukládání a správě přístupu ke klíči.
Následující příklad používá rozhraní příkazového řádku Databricks k vytvoření oboru tajného kódu a uložení klíče do tohoto oboru tajného kódu. V kódu kanálu použijte dbutils.secrets.get()
funkci s scope-name
hodnotou klíče a shared-policy-name
načtěte ji.
databricks --profile <profile-name> secrets create-scope <scope-name>
databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>
Další informace o tajných klíčích Azure Databricks najdete v tématu Správa tajných kódů.
Vytvoření poznámkového bloku a přidání kódu kanálu pro zpracování událostí
Následující příklad čte události IoT z tématu, ale můžete přizpůsobit příklad požadavkům vaší aplikace. Jako osvědčený postup doporučuje Databricks ke konfiguraci proměnných aplikace použít nastavení kanálu Delta Live Tables. Kód kanálu pak pomocí spark.conf.get()
funkce načte hodnoty. Další informace o použití nastavení kanálu k parametrizaci kanálu najdete v tématu Použití parametrů s kanály Delta Live Tables.
import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *
# Event Hubs configuration
EH_NAMESPACE = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME = spark.conf.get("iot.ingestion.eh.name")
EH_CONN_SHARED_ACCESS_KEY_NAME = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)
EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration
KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
"kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
"maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
"startingOffsets" : spark.conf.get("iot.ingestion.spark.startingOffsets")
}
# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)
# Basic record parsing and adding ETL audit columns
def parse(df):
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
.withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("etl_processed_timestamp", col("current_timestamp"))
.withColumn("etl_rec_uuid", expr("uuid()"))
.drop("records", "value", "key")
)
@dlt.create_table(
comment="Raw IOT Events",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)
Vytvoření kanálu
Vytvořte nový kanál s následujícím nastavením a nahraďte zástupné hodnoty odpovídajícími hodnotami pro vaše prostředí.
{
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"num_workers": 4
}
],
"development": true,
"continuous": false,
"channel": "CURRENT",
"edition": "ADVANCED",
"photon": false,
"libraries": [
{
"notebook": {
"path": "<path-to-notebook>"
}
}
],
"name": "dlt_eventhub_ingestion_using_kafka",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
"configuration": {
"iot.ingestion.eh.namespace": "<eh-namespace>",
"iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
"iot.ingestion.eh.name": "<eventhub>",
"io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
"iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
"iot.ingestion.spark.startingOffsets": "latest",
"iot.ingestion.spark.failOnDataLoss": "false",
"iot.ingestion.kafka.requestTimeout": "60000",
"iot.ingestion.kafka.sessionTimeout": "30000"
},
"target": "<target-database-name>"
}
Nahradit
<container-name>
s názvem kontejneru účtu úložiště Azure.<storage-account-name>
s názvem účtu úložiště ADLS Gen2.<eh-namespace>
nahraďte názvem vašeho oboru názvů Event Hubs.<eh-policy-name>
klíč oboru tajného klíče pro klíč zásad služby Event Hubs.<eventhub>
s názvem vaší instance služby Event Hubs.<secret-scope-name>
s názvem oboru tajných kódů Azure Databricks, který obsahuje klíč zásad služby Event Hubs.
Osvědčeným postupem je, že tento kanál nepoužívá výchozí cestu k úložišti DBFS, ale místo toho používá účet úložiště Azure Data Lake Storage Gen2 (ADLS Gen2). Další informace o konfiguraci ověřování pro účet úložiště ADLS Gen2 najdete v tématu Zabezpečený přístup k přihlašovacím údajům úložiště pomocí tajných kódů v kanálu.