Dela via


Använda Azure Event Hubs som en DLT-datakälla

Den här artikeln beskriver hur du använder DLT för att bearbeta meddelanden från Azure Event Hubs. Du kan inte använda anslutningsappen Structured Streaming Event Hubs eftersom det här biblioteket inte är tillgängligt som en del av Databricks Runtime, och DLT-tillåter inte att du använder JVM-bibliotek från tredje part.

Hur kan DLT ansluta till Azure Event Hubs?

Azure Event Hubs tillhandahåller en slutpunkt som är kompatibel med Apache Kafka som du kan använda med Structured Streaming Kafka Connector, tillgänglig i Databricks Runtime, för att bearbeta meddelanden från Azure Event Hubs. Mer information om Azure Event Hubs och Apache Kafka-kompatibilitet finns i Använda Azure Event Hubs från Apache Kafka-program.

Följande steg beskriver hur du ansluter en DLT-pipeline till en befintlig Event Hubs-instans och använder händelser från ett ämne. För att slutföra de här stegen behöver du följande event hubs-anslutningsvärden:

  • Namnet på Event Hubs-namnområdet.
  • Namnet på Event Hub-instansen i Event Hubs-namnområdet.
  • Ett namn på en princip för delad åtkomst och en principnyckel för Event Hubs. Som standard skapas en RootManageSharedAccessKey princip för varje Event Hubs-namnområde. Den här principen har behörigheterna manage, send och listen. Om din pipeline bara läser från Event Hubs rekommenderar Databricks att du skapar en ny princip med endast lyssningsbehörighet.

Mer information om anslutningssträngen för Event Hubs finns i Hämta en anslutningssträng för Event Hubs.

Notera

  • Azure Event Hubs innehåller alternativ för både OAuth 2.0 och signatur för delad åtkomst (SAS) för att auktorisera åtkomst till dina säkra resurser. Dessa instruktioner använder SAS-baserad autentisering.
  • Om du får anslutningssträngen Event Hubs från Azure-portalen kanske den inte innehåller värdet EntityPath. Värdet EntityPath krävs endast när du använder Structured Streaming Event Hubs-anslutningen. Om du använder kafka-anslutningsappen för strukturerad direktuppspelning måste du endast ange ämnesnamnet.

Lagra principnyckeln i en Azure Databricks-hemlighet

Eftersom principnyckeln är känslig information rekommenderar Databricks att du inte hårdkodar värdet i pipelinekoden. Använd i stället Azure Databricks-hemligheter för att lagra och hantera åtkomst till nyckeln.

I följande exempel används Databricks CLI för att skapa ett hemligt omfång och lagra nyckeln i det hemliga omfånget. I pipelinekoden använder du funktionen dbutils.secrets.get() med scope-name och shared-policy-name för att hämta nyckelvärdet.

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Mer information om Azure Databricks-hemligheter finns i Hemlighetshantering.

Skapa en notebook-fil och lägg till pipelinekoden för att använda händelser

I följande exempel läss IoT-händelser från ett ämne, men du kan anpassa exemplet efter kraven för ditt program. Som bästa praxis rekommenderar Databricks att du använder DLT-pipelineinställningarna för att konfigurera programvariabler. Sedan använder pipelinekoden funktionen spark.conf.get() för att hämta värden. Mer information om hur du använder pipelineinställningar för att parameterisera din pipeline finns i Använda parametrar med DLT-pipelines.

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Skapa pipelinen

Skapa en ny pipeline med följande inställningar och ersätt platshållarvärdena med lämpliga värden för din miljö.

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

Ersätta

  • <container-name> med namnet på en Azure Storage-kontocontainer.
  • <storage-account-name> med namnet på ett ADLS Gen2-lagringskonto.
  • <eh-namespace> med namnet på ditt Event Hubs-namnområde.
  • <eh-policy-name> med den hemliga åtkomstnyckeln för policy för Event Hubs.
  • <eventhub> med namnet på din Event Hubs-instans.
  • <secret-scope-name> med namnet på Azure Databricks-hemlighetsomfånget som innehåller principnyckeln för Event Hubs.

Som bästa praxis använder den här pipelinen inte standardlagringssökvägen för DBFS, utan använder i stället ett Azure Data Lake Storage Gen2-lagringskonto (ADLS Gen2). Mer information om hur du konfigurerar autentisering för ett lagringskonto i ADLS Gen2 finns under säkra åtkomstuppgifter genom att använda hemligheter i en pipeline.