Использование Центров событий Azure в качестве источника данных DLT
В этой статье объясняется, как использовать DLT для обработки сообщений из Центров событий Azure. Нельзя использовать разъём Event Hubs для структурированной потоковой передачи , так как эта библиотека недоступна в среде выполнения Databricks, и DLT не позволяет использовать сторонние библиотеки JVM.
Как DLT подключиться к Центрам событий Azure?
Центры событий Azure предоставляют конечную точку, совместимую с Apache Kafka, которую можно использовать с соединителемструктурированной потоковой передачи Kafka, доступным в Databricks Runtime, для обработки сообщений из Центров событий Azure. Дополнительные сведения о совместимости Центров событий Azure и Apache Kafka см. в статье Использование Центров событий Azure из приложений Apache Kafka.
Ниже описаны шаги по подключению конвейера DLT к существующему экземпляру Event Hubs и использованию событий из темы. Чтобы выполнить следующие действия, вам потребуется следующее значение подключения Центров событий:
- Имя пространства имен Центров событий.
- Имя экземпляра Event Hub в пространстве имен Event Hubs.
- Имя политики общего доступа и ключ политики для Центров событий. Для каждого пространства имен Центров событий создается политика
RootManageSharedAccessKey
по умолчанию. Эта политика имеет разрешенияmanage
,send
иlisten
. Если ваш конвейер считывается только из Центров событий, Databricks рекомендует создать новую политику с разрешением только на прослушивание.
Дополнительные сведения о строке подключения к Центрам событий см. в разделе Получение строки подключения к Центрам событий.
Заметка
- Центры событий Azure предоставляют возможности OAuth 2.0 и подписи для совместного доступа (SAS) для авторизации доступа к вашим защищенным ресурсам. Эти инструкции используют проверку подлинности на основе SAS.
- Если вы получите строку подключения Центров событий с портала Azure, она может не содержать значение
EntityPath
. ЗначениеEntityPath
требуется только при использовании соединителя «Structured Streaming Event Hubs». Для использования соединителя Kafka структурированной потоковой передачи требуется только имя топика.
Сохраните ключ политики в хранилище секретов Azure Databricks
Так как ключ доступа является конфиденциальной информацией, Databricks рекомендует не жестко прописывать значение в вашем коде потока данных. Вместо этого используйте секреты Azure Databricks для хранения и управления доступом к ключу.
В следующем примере используется CLI Databricks для создания контейнера секретов и хранения ключа в этом контейнере секретов. В коде конвейера используйте функцию dbutils.secrets.get()
с scope-name
и shared-policy-name
для получения значения ключа.
databricks --profile <profile-name> secrets create-scope <scope-name>
databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>
Дополнительные сведения о секретах Azure Databricks см. в разделе Управление секретами.
Создайте записную книжку и добавьте код конвейера для обработки событий.
В следующем примере считываются события Интернета вещей из топика, но вы можете адаптировать этот пример для требований вашего приложения. Рекомендуется, как лучшая практика, использовать настройки конвейера DLT для конфигурации переменных приложения, как рекомендует Databricks. Затем код конвейера использует функцию spark.conf.get()
для получения значений. Дополнительные сведения об использовании параметров конвейера для параметризации конвейера см. в разделе Использование параметров с конвейерами DLT.
import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *
# Event Hubs configuration
EH_NAMESPACE = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME = spark.conf.get("iot.ingestion.eh.name")
EH_CONN_SHARED_ACCESS_KEY_NAME = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)
EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration
KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
"kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
"maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
"startingOffsets" : spark.conf.get("iot.ingestion.spark.startingOffsets")
}
# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)
# Basic record parsing and adding ETL audit columns
def parse(df):
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
.withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("etl_processed_timestamp", col("current_timestamp"))
.withColumn("etl_rec_uuid", expr("uuid()"))
.drop("records", "value", "key")
)
@dlt.create_table(
comment="Raw IOT Events",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)
Создание конвейера
Создайте конвейер со следующими параметрами, заменив значения заполнителей соответствующими значениями для вашей среды.
{
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"num_workers": 4
}
],
"development": true,
"continuous": false,
"channel": "CURRENT",
"edition": "ADVANCED",
"photon": false,
"libraries": [
{
"notebook": {
"path": "<path-to-notebook>"
}
}
],
"name": "dlt_eventhub_ingestion_using_kafka",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
"configuration": {
"iot.ingestion.eh.namespace": "<eh-namespace>",
"iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
"iot.ingestion.eh.name": "<eventhub>",
"io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
"iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
"iot.ingestion.spark.startingOffsets": "latest",
"iot.ingestion.spark.failOnDataLoss": "false",
"iot.ingestion.kafka.requestTimeout": "60000",
"iot.ingestion.kafka.sessionTimeout": "30000"
},
"target": "<target-database-name>"
}
Заменить
-
<container-name>
с именем контейнера учетной записи хранения Azure. -
<storage-account-name>
с именем учетной записи хранения ADLS 2-го поколения. -
<eh-namespace>
с именем вашего пространства имен Центров событий. -
<eh-policy-name>
с ключом области секрета для ключа политики Центров событий. -
<eventhub>
с именем экземпляра Event Hubs. -
<secret-scope-name>
с именем области секрета Azure Databricks, содержащей ключ политики Центров событий.
В качестве рекомендации этот конвейер не использует путь к хранилищу DBFS по умолчанию, а вместо этого использует учетную запись хранения Azure Data Lake Storage 2-го поколения (ADLS 2-го поколения). Для получения дополнительной информации о настройке аутентификации для учетной записи хранения ADLS Gen2 см. раздел "Безопасный доступ к учетным данным хранилища с использованием секретов в конвейере".