Udostępnij za pośrednictwem


Używanie usługi Azure Event Hubs jako źródła danych DLT

W tym artykule wyjaśniono, jak używać biblioteki DLT do przetwarzania komunikatów z usługi Azure Event Hubs. Nie można użyć łącznika Strukturyzowanego przesyłania strumieniowego usługi Event Hubs, ponieważ ta biblioteka nie jest dostępna w ramach środowiska Databricks Runtime, a DLT nie zezwala na używanie bibliotek JVM innych firm.

Jak można połączyć się z usługą Azure Event Hubs za pomocą biblioteki DLT?

Usługa Azure Event Hubs zapewnia punkt końcowy zgodny z Apache Kafka, którego można używać z łącznikiem Kafka do Strukturalnego Przesyłania Strumieniowego , dostępnym w środowisku Databricks Runtime, do przetwarzania komunikatów z usługi Azure Event Hubs. Aby uzyskać więcej informacji na temat zgodności usług Azure Event Hubs i Apache Kafka, zobacz Use Azure Event Hubs from Apache Kafka applications(Korzystanie z usługi Azure Event Hubs z poziomu aplikacji platformy Apache Kafka).

W poniższych krokach opisano łączenie potoku DLT z istniejącym wystąpieniem usługi Event Hubs i pobieranie zdarzeń z konkretnego tematu. Do wykonania tych kroków potrzebne są następujące wartości połączenia usługi Event Hubs:

  • Nazwa przestrzeni nazw usługi Event Hubs.
  • Nazwa wystąpienia centrum zdarzeń w przestrzeni nazw usługi Event Hubs.
  • Nazwa zasad dostępu współdzielonego i klucz zasad dla usługi Event Hubs. Domyślnie dla każdej przestrzeni nazw usługi Event Hubs jest tworzona polityka RootManageSharedAccessKey. Ta polityka ma ustawienia dostępu manage, send i listen. Jeśli potok odczytuje tylko z usługi Event Hubs, usługa Databricks zaleca utworzenie nowej polityki z uprawnieniami do nasłuchiwania.

Aby uzyskać więcej informacji na temat parametrów połączenia usługi Event Hubs, zobacz Pobieranie parametrów połączenia usługi Event Hubs.

Notatka

  • Usługa Azure Event Hubs udostępnia zarówno opcje protokołu OAuth 2.0, jak i sygnatury dostępu współdzielonego (SAS), aby autoryzować dostęp do bezpiecznych zasobów. Te instrukcje korzystają z uwierzytelniania opartego na SAS.
  • Jeśli otrzymasz parametry połączenia usługi Event Hubs z witryny Azure Portal, może nie zawierać wartości EntityPath. Wartość EntityPath jest wymagana tylko w przypadku korzystania z łącznika Event Hubs Structured Streaming. Użycie łącznika Kafka Structured Streaming wymaga podania tylko nazwy tematu.

Przechowaj klucz zasad w sekrecie usługi Azure Databricks

Ponieważ klucz polityki to poufne informacje, firma Databricks zaleca, aby nie zapisywać jego wartości bezpośrednio w kodzie potoku. Zamiast tego użyj wpisów tajnych usługi Azure Databricks, aby przechowywać klucz i zarządzać dostępem do niego.

W poniższym przykładzie użyto Databricks CLI do utworzenia zakresu tajemnicy i zapisania klucza w tym zakresie tajemnicy. W kodzie potoku użyj funkcji dbutils.secrets.get() razem z scope-name i shared-policy-name, aby pobrać kluczową wartość.

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Aby uzyskać więcej informacji na temat tajemnic w Azure Databricks, odwiedź zarządzanie tajemnicami.

Utwórz notes i dodaj kod potoku do obsługi zdarzeń

Poniższy przykład odczytuje zdarzenia IoT z tematu, ale możesz dostosować przykład wymagań aplikacji. Najlepszym rozwiązaniem jest użycie ustawień potoku DLT w celu skonfigurowania zmiennych aplikacji. Kod potokowy następnie używa funkcji spark.conf.get() do pobierania wartości. Aby uzyskać więcej informacji na temat używania ustawień pipeline do sparametryzowania pipeline, zobacz Użyj parametrów z pipeline DLT.

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Utwórz potok

Utwórz nowy potok przy użyciu następujących ustawień, zastępując wartości symboli zastępczych odpowiednimi wartościami dla danego środowiska.

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

Zamień

  • <container-name> z nazwą kontenera konta usługi Azure Storage.
  • <storage-account-name> z nazwą konta magazynu Azure Data Lake Storage Gen2 (ADLS Gen2).
  • <eh-namespace> z nazwą przestrzeni nazw usługi Event Hubs.
  • <eh-policy-name> z tajnym kluczem zakresu dla klucza polityki Event Hubs.
  • <eventhub> z nazwą wystąpienia usługi Event Hubs.
  • <secret-scope-name> z nazwą tajnego zakresu usługi Azure Databricks, który zawiera klucz polityki Event Hubs.

Najlepszym rozwiązaniem jest to, że ten potok nie używa domyślnej ścieżki magazynu systemu plików DBFS, ale zamiast tego używa konta magazynu usługi Azure Data Lake Storage Gen2 (ADLS Gen2). Aby uzyskać więcej informacji o konfigurowaniu uwierzytelniania dla konta magazynu ADLS Gen2, zobacz Bezpieczny dostęp do poświadczeń magazynu za pomocą tajemnic w potoku.