Verwenden von Azure Event Hubs als Delta Live Tables-Datenquelle
In diesem Artikel wird erläutert, wie Sie Delta Live Tables verwenden, um Nachrichten aus Azure Event Hubs zu verarbeiten. Sie können den Connector für strukturiertes Streaming mit Event Hubs nicht verwenden, da diese Bibliothek nicht als Teil von Databricks Runtime verfügbar ist und Delta Live Tables nicht zulässt, JVM-Bibliotheken von Drittanbietern zu verwenden.
Wie kann Delta Live Tables eine Verbindung mit Azure Event Hubs herstellen?
Azure Event Hubs stellt einen mit Apache Kafka kompatiblen Endpunkt bereit, den Sie mit dem in Databricks Runtime verfügbaren Kafka-Connector für strukturiertes Streaming verwenden können, um Nachrichten von Azure Event Hubs zu verarbeiten. Weitere Informationen zur Azure Event Hubs- und Apache Kafka-Kompatibilität finden Sie unter Verwenden von Azure Event Hubs aus Apache Kafka-Anwendungen.
In den folgenden Schritten wird beschrieben, wie Sie eine Delta Live Tables-Pipeline mit einer vorhandenen Event Hubs-Instanz verbinden und Ereignisse aus einem Thema nutzen. Zum Ausführen dieser Schritte benötigen Sie die folgenden Event Hubs-Verbindungswerte:
- Den Namen des Event Hubs-Namespace.
- Den Namen der Event Hub-Instanz im Event Hubs-Namespace.
- Den Namen der SAS-Richtlinie und den Richtlinienschlüssel für Event Hubs. Standardmäßig wird eine
RootManageSharedAccessKey
-Richtlinie für jeden Event Hubs-Namespace erstellt. Diese Richtlinie verfügt übermanage
-,send
- undlisten
-Berechtigungen. Wenn Ihre Pipeline nur aus Event Hubs liest, empfiehlt Databricks, eine neue Richtlinie nur mit Lauschberechtigung zu erstellen.
Weitere Informationen zur Event Hubs-Verbindungszeichenfolge finden Sie unter Abrufen einer Event Hubs-Verbindungszeichenfolge.
Hinweis
- Azure Event Hubs bietet sowohl OAuth 2.0- als auch SAS-Optionen (Shared Access Signature), um den Zugriff auf Ihre sicheren Ressourcen zu autorisieren. In diesen Anweisungen wird die SAS-basierte Authentifizierung verwendet.
- Wenn Sie die Event Hubs-Verbindungszeichenfolge aus dem Azure-Portal abrufen, enthält sie möglicherweise nicht den
EntityPath
-Wert. DerEntityPath
-Wert ist nur erforderlich, wenn der Event Hubs-Connector für strukturiertes Streaming verwendet wird. Bei Verwendung des Kafka-Connectors für strukturiertes Streaming ist nur die Angabe des Themennamens erforderlich.
Speichern des Richtlinienschlüssels in einem Azure Databricks-Geheimnis
Da es sich bei dem Richtlinienschlüssel um vertrauliche Informationen handelt, empfiehlt Databricks, den Wert im Pipelinecode nicht hartzucodieren. Verwenden Sie stattdessen Azure Databricks-Geheimnisse, um den Zugriff auf den Schlüssel zu speichern und zu verwalten.
Im folgenden Beispiel wird die Databricks CLI verwendet, um einen geheimen Bereich zu erstellen und den Schlüssel in diesem geheimen Bereich zu speichern. Verwenden Sie die dbutils.secrets.get()
-Funktion in Ihrem Pipelinecode mit scope-name
und shared-policy-name
, um den Schlüsselwert abzurufen.
databricks --profile <profile-name> secrets create-scope <scope-name>
databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>
Weitere Informationen zu Azure Databricks-Geheimnissen finden Sie unter Geheimnisverwaltung.
Erstellen eines Notebooks und Hinzufügen des Pipelinecodes zum Nutzen von Ereignissen
Im folgenden Beispiel werden IoT-Ereignisse aus einem Thema gelesen. Sie können das Beispiel jedoch an die Anforderungen Ihrer Anwendung anpassen. Als bewährte Methode empfiehlt Databricks, die Delta Live Tables-Pipelineeinstellungen zum Konfigurieren von Anwendungsvariablen zu verwenden. Ihr Pipelinecode ruft dann Werte über die spark.conf.get()
-Funktion ab. Weitere Informationen zur Verwendung von Pipelineeinstellungen zum Parametrisieren ihrer Pipeline finden Sie unter Verwenden von Parametern mit Delta Live Tables-Pipelines.
import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *
# Event Hubs configuration
EH_NAMESPACE = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME = spark.conf.get("iot.ingestion.eh.name")
EH_CONN_SHARED_ACCESS_KEY_NAME = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)
EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration
KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
"kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
"maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
"startingOffsets" : spark.conf.get("iot.ingestion.spark.startingOffsets")
}
# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)
# Basic record parsing and adding ETL audit columns
def parse(df):
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
.withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("etl_processed_timestamp", col("current_timestamp"))
.withColumn("etl_rec_uuid", expr("uuid()"))
.drop("records", "value", "key")
)
@dlt.create_table(
comment="Raw IOT Events",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)
Erstellen der Pipeline
Erstellen Sie eine neue Pipeline mit den folgenden Einstellungen, und ersetzen Sie die Platzhalterwerte durch geeignete Werte für Ihre Umgebung.
{
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"num_workers": 4
}
],
"development": true,
"continuous": false,
"channel": "CURRENT",
"edition": "ADVANCED",
"photon": false,
"libraries": [
{
"notebook": {
"path": "<path-to-notebook>"
}
}
],
"name": "dlt_eventhub_ingestion_using_kafka",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
"configuration": {
"iot.ingestion.eh.namespace": "<eh-namespace>",
"iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
"iot.ingestion.eh.name": "<eventhub>",
"io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
"iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
"iot.ingestion.spark.startingOffsets": "latest",
"iot.ingestion.spark.failOnDataLoss": "false",
"iot.ingestion.kafka.requestTimeout": "60000",
"iot.ingestion.kafka.sessionTimeout": "30000"
},
"target": "<target-database-name>"
}
Replace
<container-name>
durch den Namen eines Azure-Speicherkontocontainers.<storage-account-name>
durch den Namen eines ADLS Gen2-Speicherkontos.<eh-namespace>
durch den Namen Ihres Event Hubs-Namespace.<eh-policy-name>
durch den geheimen Bereichsschlüssel für den Event Hubs-Richtlinienschlüssel.<eventhub>
durch den Namen Ihrer Event Hubs-Instanz.<secret-scope-name>
durch den Namen des Azure Databricks-Geheimnisbereichs, der den Event Hubs-Richtlinienschlüssel enthält.
Als bewährte Methode verwendet diese Pipeline nicht den DBFS-Standardspeicherpfad, sondern stattdessen ein Azure Data Lake Storage Gen2-Speicherkonto (ADLS Gen2). Weitere Informationen zum Konfigurieren der Authentifizierung für ein ADLS Gen2-Speicherkonto finden Sie unter Sicheres Zugreifen auf Storage-Anmeldeinformationen mit Geheimnissen in einer Pipeline.