Usare Azure Event Hubs come fonte dati DLT
Questo articolo illustra come usare DLT per elaborare i messaggi da Hub eventi di Azure. Non è possibile utilizzare il connettore Structured Streaming Event Hubs perché questa libreria non è disponibile come parte di Databricks Runtime e DLT non permette l'uso di librerie JVM di terze parti.
In che modo DLT è possibile connettersi a Hub eventi di Azure?
Hub eventi di Azure offre un endpoint compatibile con Apache Kafka che è possibile usare con il connettore Kafka structured streaming , disponibile in Databricks Runtime, per elaborare i messaggi da Hub eventi di Azure. Per altre informazioni sulla compatibilità di Hub eventi di Azure e Apache Kafka, vedere Usare Hub eventi di Azure dalle applicazioni Apache Kafka.
I passaggi seguenti descrivono la connessione di una pipeline DLT a un'istanza di Hub eventi esistente e l'utilizzo di eventi da un argomento. Per completare questi passaggi, sono necessari i valori di connessione di Hub eventi seguenti:
- Nome dello spazio dei nomi di Event Hubs.
- Nome dell'istanza di Event Hub nello spazio dei nomi di Event Hubs.
- Nome e chiave dei criteri di accesso condiviso per Event Hubs. Per impostazione predefinita, viene creato un criterio di
RootManageSharedAccessKey
per ogni spazio dei nomi di Event Hub. Questa politica dispone di autorizzazionimanage
,send
elisten
. Se la pipeline legge solo da Event Hubs, Databricks consiglia di creare un nuovo criterio con la sola autorizzazione all'ascolto.
Per ulteriori informazioni sulla stringa di connessione degli Hub Eventi, vedi Consulta una stringa di connessione degli Hub Eventi.
Nota
- Hub eventi di Azure offre opzioni di firma di accesso condiviso (SAS) e OAuth 2.0 per autorizzare l'accesso alle risorse protette. Queste istruzioni usano l'autenticazione basata su SAS.
- Se si ottiene la stringa di connessione di Hub eventi dal portale di Azure, potrebbe non contenere il valore
EntityPath
. Il valoreEntityPath
è obbligatorio solo quando si usa il connettore di Event Hubs di streaming strutturato. L'uso del connettore Kafka Structured Streaming richiede solo il nome dell'argomento.
Archiviare la chiave dei criteri in un segreto di Azure Databricks
Poiché la chiave della politica è un'informazione sensibile, Databricks consiglia di non hardcodificare il valore nel codice della pipeline. Usare invece i segreti di Azure Databricks per archiviare e gestire l'accesso alla chiave.
Nell'esempio seguente viene usata l'interfaccia della riga di comando di Databricks per creare un ambito segreto e archiviare la chiave nell'ambito del segreto. Nel codice della pipeline usare la funzione dbutils.secrets.get()
con il scope-name
e shared-policy-name
per recuperare il valore della chiave.
databricks --profile <profile-name> secrets create-scope <scope-name>
databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>
Per altre informazioni sui segreti di Azure Databricks, vedere Secret management.
Creare un notebook e aggiungere il codice della pipeline per utilizzare gli eventi
L'esempio seguente legge gli eventi IoT da un argomento, ma è possibile adattare l'esempio per i requisiti dell'applicazione. Come procedura consigliata, Databricks consiglia di usare le impostazioni della pipeline DLT per configurare le variabili dell'applicazione. Il codice della pipeline usa quindi la funzione spark.conf.get()
per recuperare i valori. Per ulteriori informazioni sull'uso delle impostazioni della pipeline per parametrizzare la pipeline, consulta Usare i parametri con le pipeline DLT.
import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *
# Event Hubs configuration
EH_NAMESPACE = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME = spark.conf.get("iot.ingestion.eh.name")
EH_CONN_SHARED_ACCESS_KEY_NAME = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)
EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration
KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
"kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
"maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
"startingOffsets" : spark.conf.get("iot.ingestion.spark.startingOffsets")
}
# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)
# Basic record parsing and adding ETL audit columns
def parse(df):
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
.withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("etl_processed_timestamp", col("current_timestamp"))
.withColumn("etl_rec_uuid", expr("uuid()"))
.drop("records", "value", "key")
)
@dlt.create_table(
comment="Raw IOT Events",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)
Creare la pipeline
Creare una nuova pipeline con le impostazioni seguenti, sostituendo i valori segnaposto con i valori appropriati per l'ambiente in uso.
{
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"num_workers": 4
}
],
"development": true,
"continuous": false,
"channel": "CURRENT",
"edition": "ADVANCED",
"photon": false,
"libraries": [
{
"notebook": {
"path": "<path-to-notebook>"
}
}
],
"name": "dlt_eventhub_ingestion_using_kafka",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
"configuration": {
"iot.ingestion.eh.namespace": "<eh-namespace>",
"iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
"iot.ingestion.eh.name": "<eventhub>",
"io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
"iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
"iot.ingestion.spark.startingOffsets": "latest",
"iot.ingestion.spark.failOnDataLoss": "false",
"iot.ingestion.kafka.requestTimeout": "60000",
"iot.ingestion.kafka.sessionTimeout": "30000"
},
"target": "<target-database-name>"
}
Sostituire
-
<container-name>
come nome di un contenitore dell'account di archiviazione di Azure. -
<storage-account-name>
con il nome di un account di archiviazione ADLS Gen2. -
<eh-namespace>
con il nome dello spazio dei nomi Event Hubs. -
<eh-policy-name>
con la chiave dell'ambito segreto per la chiave della politica di Hub eventi. -
<eventhub>
con il nome dell'istanza di Event Hubs. -
<secret-scope-name>
con il nome dell'ambito segreto di Azure Databricks che contiene la chiave della policy di Event Hubs.
Come procedura consigliata, questa pipeline non usa il percorso di archiviazione DBFS predefinito, ma usa invece un account di archiviazione di Azure Data Lake Storage Gen2 (ADLS Gen2). Per altre informazioni sulla configurazione dell'autenticazione per un account di archiviazione ADLS Gen2, vedere Accedere in modo sicuro alle credenziali di archiviazione con segreti in una pipeline.