Sdílet prostřednictvím


Použití služby Azure Event Hubs jako zdroje dat DLT

Tento článek vysvětluje, jak pomocí DLT zpracovávat zprávy ze služby Azure Event Hubs. Konektor Structured Streaming Event Hubs nelze použít, protože tato knihovna není k dispozici jako součást prostředí Databricks Runtime a DLT neumožňuje použití knihoven JVM třetích stran.

Jak se může DLT připojit ke službě Azure Event Hubs?

Azure Event Hubs poskytuje koncový bod kompatibilní s Apache Kafka, který můžete použít s konektorem Structured Streaming Kafka, který je k dispozici v Databricks Runtime, ke zpracování zpráv ze služby Azure Event Hubs. Další informace o kompatibilitě služby Azure Event Hubs a Apache Kafka najdete v tématu Použití služby Azure Event Hubs z aplikací Apache Kafka.

Následující kroky popisují připojení kanálu DLT k existující instanci služby Event Hubs a využívání událostí z tématu. K provedení těchto kroků potřebujete následující hodnoty připojení služby Event Hubs:

  • Název prostoru názvů Event Hubs.
  • Název instance centra událostí ve jmenném prostoru Event Hubs.
  • Název zásady sdíleného přístupu a klíč zásad pro službu Event Hubs. Ve výchozím nastavení se pro každý obor názvů služby Event Hubs vytvoří zásada RootManageSharedAccessKey. Tato zásada má oprávnění manage, send a listen. Pokud váš datový kanál čte pouze ze služby Event Hubs, Databricks doporučuje vytvořit novou zásadu pouze s oprávněním pro naslouchání.

Další informace o připojovacím řetězci služby Event Hubs najdete v tématu Získání připojovacího řetězce služby Event Hubs.

Poznámka

  • Azure Event Hubs poskytuje možnosti OAuth 2.0 i sdíleného přístupového podpisu (SAS) pro autorizaci přístupu k vašim zabezpečeným prostředkům. Tyto pokyny používají ověřování založené na SAS.
  • Pokud z Azure portálu získáte připojovací řetězec služby Event Hubs, nemusí obsahovat hodnotu EntityPath. Hodnota EntityPath se vyžaduje pouze při použití konektoru Azure Event Hubs pro strukturované streamování. Použití konektoru Kafka strukturovaného streamování vyžaduje zadání pouze názvu tématu.

Uložte klíč zásad do tajemství Azure Databricks

Protože klíč politiky je citlivá informace, Databricks doporučuje nehardcodovat tuto hodnotu ve vašem kódu pipeliny. Místo toho použijte tajné kódy Azure Databricks k ukládání a správě přístupu ke klíči.

Následující příklad používá rozhraní příkazového řádku Databricks k vytvoření oboru tajného kódu a uložení klíče do tohoto oboru tajného kódu. V kódu pipeline použijte funkci dbutils.secrets.get() s scope-name a shared-policy-name k načtení hodnoty klíče.

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Další informace o tajných klíčích Azure Databricks najdete v tématu správa tajných kódů.

Vytvořte notebook a přidejte pipeline kód ke zpracování událostí

Následující příklad čte události IoT z tématu, ale můžete přizpůsobit příklad požadavkům vaší aplikace. Jako osvědčený postup doporučuje Databricks ke konfiguraci proměnných aplikace použít nastavení kanálu DLT. Pak kód pipeliny využije funkci spark.conf.get() k načtení hodnot. Další informace o použití nastavení kanálu k parametrizaci kanálu naleznete v tématu Použití parametrů s kanály DLT.

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Vytvořit pipeline

Vytvořte nový kanál s následujícím nastavením a nahraďte zástupné hodnoty odpovídajícími hodnotami pro vaše prostředí.

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

Nahradit

  • <container-name> s názvem kontejneru účtu úložiště Azure.
  • <storage-account-name> s názvem účtu úložiště ADLS Gen2.
  • <eh-namespace> s názvem vašeho oboru názvů služby Event Hubs.
  • <eh-policy-name> s klíčem oboru tajného kódu pro klíč zásad služby Event Hubs.
  • <eventhub> s názvem vaší instance služby Event Hubs.
  • <secret-scope-name> s názvem tajného oboru Azure Databricks, který obsahuje klíč zásad služby Event Hubs.

Osvědčeným postupem je, že tento kanál nepoužívá výchozí cestu k úložišti DBFS, ale místo toho používá účet úložiště Azure Data Lake Storage Gen2 (ADLS Gen2). Další informace o konfiguraci ověřování pro účet úložiště ADLS Gen2 najdete v tématu Zabezpečený přístup k přihlašovacím údajům úložiště s tajnými kódy v kanálu.