Delen via


Azure Event Hubs gebruiken als een DLT-gegevensbron

In dit artikel wordt uitgelegd hoe u DLT gebruikt om berichten van Azure Event Hubs te verwerken. U kunt de Structured Streaming Event Hubs-connector niet gebruiken omdat deze bibliotheek niet beschikbaar is als onderdeel van Databricks Runtime en DLT-niet toestaat dat u JVM-bibliotheken van derdenkunt gebruiken.

Hoe kan DLT verbinding maken met Azure Event Hubs?

Azure Event Hubs biedt een eindpunt dat compatibel is met Apache Kafka die u kunt gebruiken met de Structured Streaming Kafka-connector, beschikbaar in Databricks Runtime, om berichten van Azure Event Hubs te verwerken. Zie Azure Event Hubs gebruiken vanuit Apache Kafka-toepassingenvoor meer informatie over Azure Event Hubs en Apache Kafka-compatibiliteit.

In de volgende stappen wordt beschreven hoe u een DLT-pijplijn verbindt met een bestaand Event Hubs-exemplaar en gebeurtenissen uit een onderwerp gebruikt. U hebt de volgende Event Hubs-verbindingswaarden nodig om deze stappen uit te voeren:

  • De naam van de Event Hubs-naamruimte.
  • De naam van het Event Hub-exemplaar in de Event Hubs-naamruimte.
  • Een naam en beleidssleutel voor gedeeld toegangsbeleid voor Event Hubs. Standaard wordt er een RootManageSharedAccessKey-beleid gemaakt voor elke Event Hubs-naamruimte. Dit beleid heeft manage, send en listen toestemmingen. Als uw pijplijn alleen leest uit Event Hubs, raadt Databricks aan om een nieuwe policyregel met alleen luistermachtiging te maken.

Voor meer informatie over de Event Hubs-verbindingsreeks, zie Een Event Hubs-verbindingsreeks ophalen.

Notitie

  • Azure Event Hubs biedt zowel OAuth 2.0- als SAS-opties (Shared Access Signature) om toegang tot uw beveiligde resources te autoriseren. In deze instructies wordt verificatie op basis van SAS gebruikt.
  • Als u de Event Hubs-verbindingsreeks uit Azure Portal krijgt, bevat deze mogelijk niet de EntityPath waarde. De EntityPath-waarde is alleen vereist wanneer u de Event Hubs-connector voor gestructureerd streamen gebruikt. Voor het gebruik van de Structured Streaming Kafka-connector moet alleen de onderwerpnaam worden opgegeven.

De beleidssleutel opslaan in een Azure Databricks-geheim

Omdat een beleidssleutel gevoelige informatie is, raadt Databricks aan om de waarde niet hard te coderen in uw pijplijncode. Gebruik in plaats daarvan Azure Databricks-geheimen om de toegang tot de sleutel op te slaan en te beheren.

In het volgende voorbeeld wordt de Databricks CLI gebruikt om een geheim bereik te maken en de sleutel op te slaan in dat geheime bereik. Gebruik in de pijplijncode de functie dbutils.secrets.get() met de scope-name en shared-policy-name om de sleutelwaarde op te halen.

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Zie Secret Managementvoor meer informatie over Azure Databricks-geheimen.

Maak een notebook en voeg de pijplijncode toe om gebeurtenissen te verwerken

In het volgende voorbeeld worden IoT-gebeurtenissen uit een onderwerp gelezen, maar u kunt het voorbeeld aanpassen voor de vereisten van uw toepassing. Als best practice raadt Databricks het gebruik van de DLT-pijplijninstellingen aan om toepassingsvariabelen te configureren. Uw pijplijncode gebruikt vervolgens de spark.conf.get() functie om waarden op te halen. Zie Parameters gebruiken met DLT-pijplijnenvoor meer informatie over het gebruik van pijplijninstellingen om uw pijplijn te parameteriseren.

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

De pijplijn maken

Maak een nieuwe pijplijn met de volgende instellingen, waarbij u de waarden van de tijdelijke aanduiding vervangt door de juiste waarden voor uw omgeving.

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

Vervangen

  • <container-name> met de naam van een Azure Storage-accountcontainer.
  • <storage-account-name> met de naam van een ADLS Gen2-opslagaccount.
  • <eh-namespace> met de naam van uw Event Hubs-naamruimte.
  • <eh-policy-name> met de geheime bereiksleutel voor de Event Hubs-beleidssleutel.
  • <eventhub> met de naam van uw Event Hubs-instantie.
  • <secret-scope-name> met de naam van het Azure Databricks geheime bereik dat de Event Hubs-beleidssleutel bevat.

Als best practice gebruikt deze pijplijn niet het standaard DBFS-opslagpad, maar wordt in plaats daarvan een Azure Data Lake Storage Gen2-opslagaccount (ADLS Gen2) gebruikt. Zie Veilig toegang krijgen tot opslagreferenties met geheimen in een pijplijnvoor meer informatie over het configureren van verificatie voor een ADLS Gen2-opslagaccount.