Sdílet prostřednictvím


Použití služby Azure Event Hubs jako zdroje dat Delta Live Tables

Tento článek vysvětluje, jak pomocí Delta Live Tables zpracovávat zprávy z Azure Event Hubs. Konektor Structured Streaming Event Hubs nemůžete použít, protože tato knihovna není k dispozici jako součást prostředí Databricks Runtime a Delta Live Tables neumožňují použití knihoven JVM třetích stran.

Jak se mohou Delta Live Tables připojit ke službě Azure Event Hubs?

Azure Event Hubs poskytuje koncový bod kompatibilní s Apache Kafka, který můžete použít s konektorem Kafka strukturovaného streamování, který je k dispozici v Databricks Runtime, ke zpracování zpráv ze služby Azure Event Hubs. Další informace o kompatibilitě služby Azure Event Hubs a Apache Kafka najdete v tématu Použití služby Azure Event Hubs z aplikací Apache Kafka.

Následující kroky popisují připojení kanálu Delta Live Tables k existující instanci služby Event Hubs a využívání událostí z tématu. K provedení těchto kroků potřebujete následující hodnoty připojení služby Event Hubs:

  • Název oboru názvů služby Event Hubs.
  • Název instance centra událostí v oboru názvů Event Hubs.
  • Název zásady sdíleného přístupu a klíč zásad pro službu Event Hubs. Ve výchozím nastavení RootManageSharedAccessKey se pro každý obor názvů služby Event Hubs vytvoří zásada. Tato zásada má managesend oprávnění a listen oprávnění. Pokud váš kanál čte jenom ze služby Event Hubs, doporučuje Databricks vytvořit novou zásadu pouze s oprávněním pro naslouchání.

Další informace o připojovacím řetězci služby Event Hubs najdete v tématu Získání připojovacího řetězce služby Event Hubs.

Poznámka:

  • Azure Event Hubs poskytuje možnosti OAuth 2.0 i sdíleného přístupového podpisu (SAS) pro autorizaci přístupu k vašim zabezpečeným prostředkům. Tyto pokyny používají ověřování založené na SAS.
  • Pokud získáte z Azure Portalu připojovací řetězec služby Event Hubs, nemusí obsahovat hodnotu EntityPath. Tato EntityPath hodnota se vyžaduje pouze při použití konektoru Event Hubs strukturovaného streamování. Použití konektoru Kafka strukturovaného streamování vyžaduje zadání pouze názvu tématu.

Uložení klíče zásad do tajného kódu Azure Databricks

Protože klíč zásad je citlivé informace, Databricks doporučuje nezakódovat hodnotu v kódu kanálu. Místo toho použijte tajné kódy Azure Databricks k ukládání a správě přístupu ke klíči.

Následující příklad používá rozhraní příkazového řádku Databricks k vytvoření oboru tajného kódu a uložení klíče do tohoto oboru tajného kódu. V kódu kanálu použijte dbutils.secrets.get() funkci s scope-name hodnotou klíče a shared-policy-name načtěte ji.

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Další informace o tajných klíčích Azure Databricks najdete v tématu Správa tajných kódů.

Vytvoření poznámkového bloku a přidání kódu kanálu pro zpracování událostí

Následující příklad čte události IoT z tématu, ale můžete přizpůsobit příklad požadavkům vaší aplikace. Jako osvědčený postup doporučuje Databricks používat nastavení pipeline Delta Live Tables pro konfiguraci proměnných aplikace. Kód potrubí pak pomocí funkce spark.conf.get() načte hodnoty. Další informace o použití nastavení potrubí k parametrizaci vašeho potrubí najdete v tématu Použití parametrů s potrubími Delta Live Tables.

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Vytvoření kanálu

Vytvořte nový kanál s následujícím nastavením a nahraďte zástupné hodnoty odpovídajícími hodnotami pro vaše prostředí.

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

Nahradit

  • <container-name> s názvem kontejneru účtu úložiště Azure.
  • <storage-account-name> s názvem účtu úložiště ADLS Gen2.
  • <eh-namespace> nahraďte názvem vašeho oboru názvů Event Hubs.
  • <eh-policy-name> klíč oboru tajného klíče pro klíč zásad služby Event Hubs.
  • <eventhub> s názvem vaší instance služby Event Hubs.
  • <secret-scope-name> s názvem oboru tajných kódů Azure Databricks, který obsahuje klíč zásad služby Event Hubs.

Osvědčeným postupem je, že tento kanál nepoužívá výchozí cestu k úložišti DBFS, ale místo toho používá účet úložiště Azure Data Lake Storage Gen2 (ADLS Gen2). Další informace o konfiguraci ověřování pro účet úložiště ADLS Gen2 najdete v tématu Zabezpečený přístup k přihlašovacím údajům úložiště s tajnými kódy v kanálu.