Udostępnij za pośrednictwem


Używanie usługi Azure Event Hubs jako źródła danych delty tabel na żywo

W tym artykule wyjaśniono, jak używać usługi Delta Live Tables do przetwarzania komunikatów z usługi Azure Event Hubs. Nie można użyć łącznika usługi Event Hubs ze strukturą przesyłania strumieniowego, ponieważ ta biblioteka nie jest dostępna w ramach środowiska Databricks Runtime, a tabele delta live nie zezwalają na korzystanie z bibliotek JVM innych firm.

Jak usługa Delta Live Tables może łączyć się z usługą Azure Event Hubs?

Usługa Azure Event Hubs zapewnia punkt końcowy zgodny z platformą Apache Kafka, którego można używać z łącznikiem Platformy Kafka ze strukturą przesyłania strumieniowego, dostępnym w środowisku Databricks Runtime w celu przetwarzania komunikatów z usługi Azure Event Hubs. Aby uzyskać więcej informacji na temat zgodności usług Azure Event Hubs i Apache Kafka, zobacz Use Azure Event Hubs from Apache Kafka applications (Korzystanie z usługi Azure Event Hubs z aplikacji platformy Apache Kafka).

W poniższych krokach opisano łączenie potoku delty tabel na żywo z istniejącym wystąpieniem usługi Event Hubs i używanie zdarzeń z tematu. Do wykonania tych kroków potrzebne są następujące wartości połączenia usługi Event Hubs:

  • Nazwa przestrzeni nazw usługi Event Hubs.
  • Nazwa wystąpienia centrum zdarzeń w przestrzeni nazw usługi Event Hubs.
  • Nazwa zasad dostępu współdzielonego i klucz zasad dla usługi Event Hubs. Domyślnie RootManageSharedAccessKey zasady są tworzone dla każdej przestrzeni nazw usługi Event Hubs. Te zasady mają managesend uprawnienia i listen . Jeśli potok odczytuje tylko z usługi Event Hubs, usługa Databricks zaleca utworzenie nowych zasad z uprawnieniami nasłuchiwania.

Aby uzyskać więcej informacji na temat parametry połączenia usługi Event Hubs, zobacz Pobieranie parametry połączenia usługi Event Hubs.

Uwaga

  • Usługa Azure Event Hubs udostępnia zarówno opcje protokołu OAuth 2.0, jak i sygnatury dostępu współdzielonego (SAS), aby autoryzować dostęp do bezpiecznych zasobów. Te instrukcje korzystają z uwierzytelniania opartego na sygnaturze dostępu współdzielonego.
  • Jeśli otrzymasz parametry połączenia usługi Event Hubs z witryny Azure Portal, może ona nie zawierać EntityPath wartości. Wartość jest wymagana EntityPath tylko w przypadku korzystania z łącznika usługi Event Hubs przesyłania strumieniowego ze strukturą. Użycie łącznika Platformy Kafka Ze strukturą wymaga podania tylko nazwy tematu.

Przechowywanie klucza zasad w kluczu tajnym usługi Azure Databricks

Ponieważ klucz zasad jest poufnymi informacjami, usługa Databricks zaleca, aby nie zapisywać wartości w kodzie potoku. Zamiast tego użyj wpisów tajnych usługi Azure Databricks, aby przechowywać dostęp do klucza i zarządzać nim.

W poniższym przykładzie użyto interfejsu wiersza polecenia usługi Databricks do utworzenia zakresu wpisu tajnego i zapisania klucza w tym zakresie wpisów tajnych. W kodzie potoku użyj dbutils.secrets.get() funkcji i scope-name shared-policy-name , aby pobrać wartość klucza.

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Aby uzyskać więcej informacji na temat wpisów tajnych usługi Azure Databricks, zobacz Zarządzanie wpisami tajnymi.

Tworzenie notesu i dodawanie kodu potoku w celu korzystania ze zdarzeń

Poniższy przykład odczytuje zdarzenia IoT z tematu, ale możesz dostosować przykład wymagań aplikacji. Najlepszym rozwiązaniem jest użycie ustawień potoku Delta Live Tables w usłudze Databricks w celu skonfigurowania zmiennych aplikacji. Następnie kod potoku używa funkcji do pobierania spark.conf.get() wartości. Aby uzyskać więcej informacji na temat używania ustawień potoku do sparametryzowania potoku, zobacz Use parameters with Delta Live Tables pipelines (Używanie parametrów z potokami delta live tables).

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Tworzenie potoku

Utwórz nowy potok przy użyciu następujących ustawień, zastępując wartości symboli zastępczych odpowiednimi wartościami dla danego środowiska.

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

Replace

  • <container-name> z nazwą kontenera konta usługi Azure Storage.
  • <storage-account-name> z nazwą konta magazynu usługi ADLS Gen2.
  • <eh-namespace> na nazwę Twojej przestrzeni nazw usługi Event Hubs.
  • <eh-policy-name> z kluczem zakresu wpisu tajnego dla klucza zasad usługi Event Hubs.
  • <eventhub> z nazwą wystąpienia usługi Event Hubs.
  • <secret-scope-name> z nazwą zakresu wpisu tajnego usługi Azure Databricks, który zawiera klucz zasad usługi Event Hubs.

Najlepszym rozwiązaniem jest to, że ten potok nie używa domyślnej ścieżki magazynu systemu plików DBFS, ale zamiast tego używa konta magazynu usługi Azure Data Lake Storage Gen2 (ADLS Gen2). Aby uzyskać więcej informacji na temat konfigurowania uwierzytelniania dla konta magazynu usługi ADLS Gen2, zobacz Bezpieczne uzyskiwanie dostępu do poświadczeń magazynu przy użyciu wpisów tajnych w potoku.