共用方式為


使用 Azure 事件中樞作為 DLT 數據源

本文說明如何使用 DLT 來處理來自 Azure 事件中樞的訊息。 您無法使用 結構化串流事件中樞連接器,因為此連結庫無法作為 Databricks Runtime 的一部分使用,而 DLT 不允許使用第三方 JVM 連結庫

DLT 如何連線到 Azure 事件中樞?

Azure 事件中樞提供與 Apache Kafka 相容的端點,您可以搭配 結構化串流 Kafka 連接器,可在 Databricks Runtime 中使用,以處理來自 Azure 事件中樞的訊息。 如需 Azure 事件中樞和 Apache Kafka 相容性的詳細資訊,請參閱 從 Apache Kafka 應用程式使用 Azure 事件中樞

下列步驟說明將 DLT 管線連線至現有的事件中樞實例,以及從主題取用事件。 若要完成這些步驟,您需要下列事件中樞連線值:

  • 事件中樞命名空間的名稱。
  • 事件中樞命名空間中事件中樞實例的名稱。
  • 事件中樞的共用存取原則名稱和原則密鑰。 根據預設,會為每個事件中樞命名空間建立 RootManageSharedAccessKey 原則。 此原則具有 managesendlisten 權限。 如果您的管線只從事件中心讀取,Databricks 建議只建立具有接聽權限的新策略。

如需事件中樞連接字串的詳細資訊,請參閱 取得事件中樞連接字串

注意

  • Azure 事件中樞同時提供 OAuth 2.0 和共用存取簽章 (SAS) 選項,以授權存取您的安全資源。 這些指示會使用SAS型驗證。
  • 如果您從 Azure 入口網站取得事件中樞連接字串,它可能不會包含 EntityPath 值。 只有在使用結構化串流事件中樞連接器時,才需要 EntityPath 值。 使用結構化串流 Kafka 連接器只需要提供主題名稱。

將原則金鑰儲存在 Azure Databricks 秘密中

因為政策金鑰是敏感資訊,因此 Databricks 建議不要將值硬編碼於管線代碼中。 請改用 Azure Databricks 秘密來儲存和管理密鑰的存取權。

下列範例會使用 Databricks CLI 來建立秘密範圍,並將密鑰儲存在該秘密範圍中。 在您的管線代碼中,使用 dbutils.secrets.get() 函式搭配 scope-nameshared-policy-name 來擷取鍵值。

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

如需 Azure Databricks 秘密的詳細資訊,請參閱 秘密管理

建立筆記本並新增工作流程代碼以接收事件

下列範例會從主題讀取IoT事件,但您可以針對應用程式的需求調整範例。 最佳做法是 Databricks 建議使用 DLT 管線設定來設定應用程式變數。 管線程式代碼接著會使用 spark.conf.get() 函式來擷取值。 如需使用管線設定將管線參數化的詳細資訊,請參閱 搭配 DLT 管線使用參數

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

建立管線

使用下列設定建立新的管道,並將其中的佔位符值替換為您環境中的適當值。

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

取代

  • <container-name> 具有 Azure 記憶體帳戶容器的名稱。
  • <storage-account-name> 作為 ADLS Gen2 儲存帳戶的名稱。
  • <eh-namespace> 事件中樞命名空間的名稱。
  • <eh-policy-name> 用於事件中樞政策密鑰的秘密範圍金鑰。
  • <eventhub>替換為您的事件中樞實例名稱。
  • <secret-scope-name> 是包含事件中樞原則密鑰的 Azure Databricks 機密範圍名稱。

最佳做法是,此管線不會使用預設 DBFS 記憶體路徑,而是改用 Azure Data Lake Storage Gen2 (ADLS Gen2) 記憶體帳戶。 如需設定 ADLS Gen2 記憶體帳戶驗證的詳細資訊,請參閱 使用管線中的秘密安全地存取記憶體認證