Azure Event Hubs gebruiken als gegevensbron van Delta Live Tables
In dit artikel wordt uitgelegd hoe u Delta Live Tables gebruikt om berichten van Azure Event Hubs te verwerken. U kunt de Event Hubs-connector voor gestructureerd streamen niet gebruiken omdat deze bibliotheek niet beschikbaar is als onderdeel van Databricks Runtime, en Delta Live Tables staat u niet toe om JVM-bibliotheken van derden te gebruiken.
Hoe kunnen Delta Live Tables verbinding maken met Azure Event Hubs?
Azure Event Hubs biedt een eindpunt dat compatibel is met Apache Kafka die u kunt gebruiken met de Structured Streaming Kafka-connector, beschikbaar in Databricks Runtime, om berichten van Azure Event Hubs te verwerken. Zie Azure Event Hubs gebruiken vanuit Apache Kafka-toepassingen voor meer informatie over Azure Event Hubs en Apache Kafka-compatibiliteit.
In de volgende stappen wordt beschreven hoe u een Delta Live Tables-pijplijn verbindt met een bestaand Event Hubs-exemplaar en gebeurtenissen uit een onderwerp gebruikt. U hebt de volgende Event Hubs-verbindingswaarden nodig om deze stappen uit te voeren:
- De naam van de Event Hubs-naamruimte.
- De naam van het Event Hub-exemplaar in de Event Hubs-naamruimte.
- Een naam en beleidssleutel voor gedeeld toegangsbeleid voor Event Hubs. Standaard wordt er een
RootManageSharedAccessKey
beleid gemaakt voor elke Event Hubs-naamruimte. Dit beleid heeftmanage
ensend
listen
machtigingen. Als uw pijplijn alleen leest uit Event Hubs, raadt Databricks aan om alleen een nieuw beleid met een listen-machtiging te maken.
Zie Een Event Hubs-verbindingsreeks ophalen voor meer informatie over de Event Hubs-verbindingsreeks.
Notitie
- Azure Event Hubs biedt zowel OAuth 2.0- als SAS-opties (Shared Access Signature) om toegang tot uw beveiligde resources te autoriseren. In deze instructies wordt verificatie op basis van SAS gebruikt.
- Als u de Event Hubs-verbindingsreeks van Azure Portal krijgt, bevat deze mogelijk niet de
EntityPath
waarde. DeEntityPath
waarde is alleen vereist wanneer u de Event Hubs-connector voor gestructureerd streamen gebruikt. Voor het gebruik van de Structured Streaming Kafka-connector moet alleen de onderwerpnaam worden opgegeven.
De beleidssleutel opslaan in een Azure Databricks-geheim
Omdat de beleidssleutel gevoelige informatie is, raadt Databricks aan om de waarde in uw pijplijncode niet hardcodering uit te voeren. Gebruik in plaats daarvan Azure Databricks-geheimen om de toegang tot de sleutel op te slaan en te beheren.
In het volgende voorbeeld wordt de Databricks CLI gebruikt om een geheim bereik te maken en de sleutel op te slaan in dat geheime bereik. Gebruik in uw pijplijncode de dbutils.secrets.get()
functie met de scope-name
functie en shared-policy-name
om de sleutelwaarde op te halen.
databricks --profile <profile-name> secrets create-scope <scope-name>
databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>
Zie Geheimbeheer voor meer informatie over Azure Databricks-geheimen.
Een notebook maken en de pijplijncode toevoegen om gebeurtenissen te gebruiken
In het volgende voorbeeld worden IoT-gebeurtenissen uit een onderwerp gelezen, maar u kunt het voorbeeld aanpassen voor de vereisten van uw toepassing. Databricks raadt aan om de pijplijninstellingen van Delta Live Tables te gebruiken om toepassingsvariabelen te configureren. Uw pijplijncode gebruikt vervolgens de spark.conf.get()
functie om waarden op te halen. Zie Parameters gebruiken met Delta Live Tables-pijplijnen voor meer informatie over het gebruik van pijplijninstellingen om uw pijplijn te parameteriseren.
import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *
# Event Hubs configuration
EH_NAMESPACE = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME = spark.conf.get("iot.ingestion.eh.name")
EH_CONN_SHARED_ACCESS_KEY_NAME = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)
EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration
KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
"kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
"maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
"startingOffsets" : spark.conf.get("iot.ingestion.spark.startingOffsets")
}
# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)
# Basic record parsing and adding ETL audit columns
def parse(df):
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
.withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("etl_processed_timestamp", col("current_timestamp"))
.withColumn("etl_rec_uuid", expr("uuid()"))
.drop("records", "value", "key")
)
@dlt.create_table(
comment="Raw IOT Events",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)
Maak de pijplijn
Maak een nieuwe pijplijn met de volgende instellingen, waarbij u de waarden van de tijdelijke aanduiding vervangt door de juiste waarden voor uw omgeving.
{
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"num_workers": 4
}
],
"development": true,
"continuous": false,
"channel": "CURRENT",
"edition": "ADVANCED",
"photon": false,
"libraries": [
{
"notebook": {
"path": "<path-to-notebook>"
}
}
],
"name": "dlt_eventhub_ingestion_using_kafka",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
"configuration": {
"iot.ingestion.eh.namespace": "<eh-namespace>",
"iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
"iot.ingestion.eh.name": "<eventhub>",
"io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
"iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
"iot.ingestion.spark.startingOffsets": "latest",
"iot.ingestion.spark.failOnDataLoss": "false",
"iot.ingestion.kafka.requestTimeout": "60000",
"iot.ingestion.kafka.sessionTimeout": "30000"
},
"target": "<target-database-name>"
}
Replace
<container-name>
met de naam van een Azure Storage-accountcontainer.<storage-account-name>
met de naam van een ADLS Gen2-opslagaccount.<eh-namespace>
met de naam van uw Event Hubs-naamruimte.<eh-policy-name>
met de geheime bereiksleutel voor de Event Hubs-beleidssleutel.<eventhub>
met de naam van uw Event Hubs-exemplaar.<secret-scope-name>
met de naam van het azure Databricks-geheime bereik dat de Event Hubs-beleidssleutel bevat.
Als best practice gebruikt deze pijplijn niet het standaard DBFS-opslagpad, maar wordt in plaats daarvan een Azure Data Lake Storage Gen2-opslagaccount (ADLS Gen2) gebruikt. Zie Veilig toegang krijgen tot opslagreferenties met geheimen in een pijplijn voor meer informatie over het configureren van verificatie voor een ADLS Gen2-opslagaccount.