Usar os Hubs de Eventos do Azure como uma fonte de dados do Delta Live Tables
Este artigo explica como usar o Delta Live Tables para processar mensagens dos Hubs de Eventos do Azure. Não é possível usar o conector Structured Streaming Event Hubs porque essa biblioteca não está disponível como parte do Databricks Runtime e o Delta Live Tables não permite que você use bibliotecas JVM de terceiros.
Como o Delta Live Tables pode se conectar aos Hubs de Eventos do Azure?
Os Hubs de Eventos do Azure fornecem um ponto de extremidade compatível com o Apache Kafka que você pode usar com o conector Kafka de Streaming Estruturado, disponível no Databricks Runtime, para processar mensagens dos Hubs de Eventos do Azure. Para obter mais informações sobre Hubs de Eventos do Azure e compatibilidade com Apache Kafka, consulte Usar Hubs de Eventos do Azure de aplicativos Apache Kafka.
As seguintes etapas descrevem como ligar um pipeline Delta Live Tables a uma instância existente de Event Hubs e como consumir eventos de um tópico. Para concluir essas etapas, você precisa dos seguintes valores de conexão de Hubs de Eventos:
- O nome do namespace Hubs de Eventos.
- O nome da instância do Hub de Eventos no namespace Hubs de Eventos.
- Um nome de política de acesso compartilhado e uma chave de política para Hubs de Eventos. Por padrão, uma
RootManageSharedAccessKey
política é criada para cada namespace de Hubs de Eventos. Esta política temmanage
esend
listen
permissões. Se o pipeline ler apenas de Hubs de Eventos, o Databricks recomenda a criação de uma nova política apenas com permissão de escuta.
Para obter mais informações sobre a cadeia de ligação dos Hubs de Eventos, consulte Obter uma cadeia de ligação dos Hubs de Eventos.
Nota
- Os Hubs de Eventos do Azure fornecem opções OAuth 2.0 e assinatura de acesso compartilhado (SAS) para autorizar o acesso aos seus recursos seguros. Estas instruções usam autenticação baseada em SAS.
- Se você obter a cadeia de conexão Hubs de Eventos do portal do Azure, ela pode não conter o valor
EntityPath
. OEntityPath
valor é necessário somente ao usar o conector Hubs de Eventos de Streaming Estruturado. Usar o Structured Streaming Kafka Connector requer fornecer apenas o nome do tópico.
Armazenar a chave de política em um segredo do Azure Databricks
Como a chave de política é uma informação confidencial, o Databricks recomenda não codificar o valor no código do pipeline. Em vez disso, use os segredos do Azure Databricks para armazenar e gerenciar o acesso à chave.
O exemplo a seguir usa a CLI do Databricks para criar um escopo secreto e armazenar a chave nesse escopo secreto. No código de pipeline, use a dbutils.secrets.get()
função com o scope-name
e shared-policy-name
para recuperar o valor da chave.
databricks --profile <profile-name> secrets create-scope <scope-name>
databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>
Para obter mais informações sobre segredos do Azure Databricks, consulte Gerenciamento de segredos.
Criar um bloco de anotações e adicionar o código de pipeline para consumir eventos
O exemplo a seguir lê eventos de IoT de um tópico, mas você pode adaptar o exemplo para os requisitos do seu aplicativo. Como prática recomendada, o Databricks recomenda o uso das configurações do pipeline Delta Live Tables para configurar variáveis de aplicativo. Em seguida, o código do pipeline usa a função spark.conf.get()
para recuperar valores. Para obter mais informações sobre como usar as configurações de pipeline para parametrizar o seu pipeline, consulte Usar parâmetros com pipelines Delta Live Tables.
import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *
# Event Hubs configuration
EH_NAMESPACE = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME = spark.conf.get("iot.ingestion.eh.name")
EH_CONN_SHARED_ACCESS_KEY_NAME = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)
EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration
KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
"kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
"maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
"startingOffsets" : spark.conf.get("iot.ingestion.spark.startingOffsets")
}
# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)
# Basic record parsing and adding ETL audit columns
def parse(df):
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
.withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("etl_processed_timestamp", col("current_timestamp"))
.withColumn("etl_rec_uuid", expr("uuid()"))
.drop("records", "value", "key")
)
@dlt.create_table(
comment="Raw IOT Events",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)
Criar o pipeline
Crie uma nova pipeline com as seguintes configurações, substituindo os valores de espaço reservado por valores apropriados para o seu ambiente.
{
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"num_workers": 4
}
],
"development": true,
"continuous": false,
"channel": "CURRENT",
"edition": "ADVANCED",
"photon": false,
"libraries": [
{
"notebook": {
"path": "<path-to-notebook>"
}
}
],
"name": "dlt_eventhub_ingestion_using_kafka",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
"configuration": {
"iot.ingestion.eh.namespace": "<eh-namespace>",
"iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
"iot.ingestion.eh.name": "<eventhub>",
"io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
"iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
"iot.ingestion.spark.startingOffsets": "latest",
"iot.ingestion.spark.failOnDataLoss": "false",
"iot.ingestion.kafka.requestTimeout": "60000",
"iot.ingestion.kafka.sessionTimeout": "30000"
},
"target": "<target-database-name>"
}
Replace
-
<container-name>
com o nome de um contêiner de conta de armazenamento do Azure. -
<storage-account-name>
com o nome de uma conta de armazenamento ADLS Gen2. -
<eh-namespace>
pelo nome do espaço de nomes dos Hubs de Eventos. -
<eh-policy-name>
com a chave de escopo secreta para a chave de política dos Hubs de Eventos. -
<eventhub>
com o nome da instância dos Hubs de Eventos. -
<secret-scope-name>
com o nome do escopo secreto do Azure Databricks que contém a chave de política dos Hubs de Eventos.
Como prática recomendada, esse pipeline não usa o caminho de armazenamento DBFS padrão, mas usa uma conta de armazenamento do Azure Data Lake Storage Gen2 (ADLS Gen2). Para obter mais informações sobre como configurar a autenticação para uma conta de armazenamento ADLS Gen2, consulte Acessar com segurança as credenciais de armazenamento utilizando segredos num pipeline.