Dela via


Använda Azure Event Hubs som en Delta Live Tables-datakälla

Den här artikeln beskriver hur du använder Delta Live Tables för att bearbeta meddelanden från Azure Event Hubs. Du kan inte använda Structured Streaming Event Hubs-anslutningsappen eftersom det här biblioteket inte är tillgängligt som en del av Databricks Runtime, och Delta Live Tables tillåter inte att du använder JVM-bibliotek från tredje part.

Hur kan Delta Live Tables ansluta till Azure Event Hubs?

Azure Event Hubs tillhandahåller en slutpunkt som är kompatibel med Apache Kafka som du kan använda med Kafka-anslutningsappen för strukturerad direktuppspelning, tillgänglig i Databricks Runtime, för att bearbeta meddelanden från Azure Event Hubs. Mer information om Azure Event Hubs och Apache Kafka-kompatibilitet finns i Använda Azure Event Hubs från Apache Kafka-program.

Följande steg beskriver hur du ansluter en Delta Live Tables-pipeline till en befintlig Event Hubs-instans och använder händelser från ett ämne. För att slutföra de här stegen behöver du följande event hubs-anslutningsvärden:

  • Namnet på Event Hubs-namnområdet.
  • Namnet på Event Hub-instansen i Event Hubs-namnområdet.
  • Ett namn på en princip för delad åtkomst och en principnyckel för Event Hubs. Som standard skapas en RootManageSharedAccessKey princip för varje Event Hubs-namnområde. Den här principen har manage, send och listen behörigheter. Om din pipeline bara läser från Event Hubs rekommenderar Databricks att du skapar en ny princip med endast lyssningsbehörighet.

Mer information om anslutningssträngen för Event Hubs finns i Hämta en händelsehubbanslutningssträng.

Kommentar

  • Azure Event Hubs innehåller alternativ för både OAuth 2.0 och signatur för delad åtkomst (SAS) för att auktorisera åtkomst till dina säkra resurser. Dessa instruktioner använder SAS-baserad autentisering.
  • Om du får anslutningssträngen Event Hubs från Azure-portalen kanske den inte innehåller värdet EntityPath. Värdet EntityPath krävs endast när du använder anslutningsappen För strukturerad direktuppspelning. Om du använder kafka-anslutningsappen för strukturerad direktuppspelning måste du endast ange ämnesnamnet.

Lagra principnyckeln i en Azure Databricks-hemlighet

Eftersom principnyckeln är känslig information rekommenderar Databricks att du inte hårdkodar värdet i pipelinekoden. Använd i stället Azure Databricks-hemligheter för att lagra och hantera åtkomst till nyckeln.

I följande exempel används Databricks CLI för att skapa ett hemligt omfång och lagra nyckeln i det hemliga omfånget. I pipelinekoden använder du dbutils.secrets.get() funktionen med scope-name och shared-policy-name för att hämta nyckelvärdet.

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Mer information om Azure Databricks-hemligheter finns i Hemlig hantering.

Skapa en notebook-fil och lägg till pipelinekoden för att använda händelser

I följande exempel läss IoT-händelser från ett ämne, men du kan anpassa exemplet efter kraven för ditt program. Som bästa praxis rekommenderar Databricks att du använder pipelineinställningarna för Delta Live Tables för att konfigurera programvariabler. Sedan använder pipelinekoden funktionen spark.conf.get() för att hämta värden. Mer information om hur du använder pipelineinställningar för att parametrisera din pipeline finns i Använda parametrar med Delta Live Tables-pipelines.

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Skapa pipelinen

Skapa en ny pipeline med följande inställningar och ersätt platshållarvärdena med lämpliga värden för din miljö.

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

Replace

  • <container-name> med namnet på en Azure Storage-kontocontainer.
  • <storage-account-name> med namnet på ett ADLS Gen2-lagringskonto.
  • <eh-namespace> med namnet på Event Hubs-namnrymden.
  • <eh-policy-name> med den hemliga omfångsnyckeln för principnyckeln för Event Hubs.
  • <eventhub> med namnet på din Event Hubs-instans.
  • <secret-scope-name> med namnet på det hemliga Azure Databricks-omfånget som innehåller principnyckeln för Event Hubs.

Som bästa praxis använder den här pipelinen inte standardlagringssökvägen för DBFS, utan använder i stället ett Azure Data Lake Storage Gen2-lagringskonto (ADLS Gen2). För mer information om hur du konfigurerar autentisering för ett ADLS Gen2-lagringskonto, se Säker åtkomst till lagringsuppgifter med hjälp av hemligheter i en pipeline.