共用方式為


使用 Apache Kafka 和 Azure Databricks 進行串流處理

本文說明如何在 Azure Databricks 上執行結構化串流工作負載時,使用 Apache Kafka 作為來源或接收器。

如需更多 Kafka,請參閱 Kafka 文件

從 Kafka 讀取資料

以下是從 Kafka 串流讀取的範例:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Azure Databricks 也支援 Kafka 資料來源的批次讀取語意,如下列範例所示:

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

對於累加式批次載入,Databricks 建議將 Kafka 與 Trigger.AvailableNow 搭配使用。 請參閱設定累加批次處理

在 Databricks Runtime 13.3 LTS 和更新版本中,Azure Databricks 提供 SQL 函式來讀取 Kafka 資料。 僅差異即時資料表或 Databricks SQL 中的串流資料表支援使用 SQL 進行串流處理。 請參閱 read_kafka 資料表值函式

設定 Kafka 結構化串流讀取器

Azure Databricks 提供 kafka 關鍵字作為資料格式,以設定 Kafka 0.10+ 的連線。

以下是 Kafka 的最常見組態:

有多種方式指定要訂閱的主題。 您應僅提供下列其中一個參數:

選項 Description
訂閱 主題的逗號分隔清單。 要訂閱的主題清單。
subscribePattern Java regex 字串。 用於訂閱主題的模式。
assign JSON 字串 {"topicA":[0,1],"topic":[2,4]} 要取用的特定 topicPartitions。

其他值得注意的組態:

選項 預設值 說明
kafka.bootstrap.servers host:port 的逗號分隔清單。 empty [必要] Kafka bootstrap.servers 組態。 如果沒有找到來自 Kafka 的資料,請先檢查訊息代理程式地址清單。 如果訊息代理程式地址清單不正確,可能沒有任何錯誤。 這是因為 Kafka 用戶端假設訊息代理程式最終將變得可用,並且在發生網路錯誤的情況下會永遠重試。
failOnDataLoss truefalse true [選用] 在資料可能遺失的情況下是否讓查詢失敗。 在許多情況下 (例如主題已刪除、主題在處理前已截斷等等),查詢可能永遠也無法從 Kafka 讀取資料。 我們會嘗試保守地估計資料是否可能遺失。 有時這可能會導致誤報。 如果此選項無法如預期般運作,或者您希望查詢不管資料是否遺失都繼續處理,請將此選項設定為 false
minPartitions 整數 >= 0,0 = 停用。 0 (已停用) [選用] 要從 Kafka 讀取的分割數目下限。 您可以使用 minPartitions 選項將 Spark 設定為使用任意最少數目的分割從 Kafka 讀取。 通常,Spark 在 Kafka topicPartitions 與從 Kafka 取用的 Spark 分割之間存在 1-1 對應。 如果您將 minPartitions 選項設定為大於 Kafka topicPartitions 的值,則 Spark 會將大型 Kafka 分割拆分為較小的部分。 您可以在出現尖峰負載時、資料扭曲時以及在串流落後時,設定此選項來提高處理速率。 這需要在每次觸發時初始化 Kafka 使用者。如果您在連線至 Kafka 時使用 SSL,這可能會影響效能。
kafka.group.id Kafka 取用者群組識別碼。 未設定 [選用] 從 Kafka 讀取時要使用的群組識別碼。 請謹慎使用此選項。 根據預設,每個查詢都會產生用於讀取資料的唯一群組識別碼。 這可確保每個查詢都具有自己的取用者群組,不會受到任何其他取用者的干擾,因此可以讀取其訂閱的主題的所有分割。 在某些情況下 (例如,Kafka 群組型授權),您可能希望使用特定的授權群組識別碼來讀取資料。 您可以選擇性地設定群組識別碼。 但是,執行此作業時要特別謹慎,因為它可能會導致非預期行為。

- 同時執行具有相同群組識別碼的查詢 (包括批次和串流) 可能會相互干擾,導致每個查詢僅讀取部分資料。
- 快速連續啟動/重新啟動查詢時,也可能會發生這種情況。 若要將此類問題降至最低,請將 Kafka 取用者組態 session.timeout.ms 設定為非常小。
startingOffsets 最早、最新 最新 [選用] 啟動查詢時的起點,可以是從最早位移開始的「最早」,也可以是指定每個 TopicPartition 的起始位移的 json 字串。 在 json 中,可使用 -2 作為位移來表示最早,使用 -1 表示最新。 注意:對於批次查詢,不允許使用最新 (無論是隱式還是在 json 中使用 -1)。 對於串流查詢,這僅在啟動新查詢時適用,並且繼續始終從查詢停止的位置開始。 查詢期間新探索的分割將最早啟動。

如需其他選用組態,請參閱結構化串流 Kafka 整合指南

Kafka 記錄的結構描述

Kafka 記錄的結構描述為:

資料行 類型
key binary
value binary
主題 字串
磁碟分割 int
offset long
timestamp long
timestampType int

一律使用 keyvalueByteArrayDeserializer 還原序列化為位元組陣列。 請使用 DataFrame 作業 (例如 cast("string")) 明確還原序列化索引鍵和值。

將資料寫入 Kafka

以下是串流寫入 Kafka 的範例:

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Azure Databricks 也支援 Kafka 資料接收器的批次寫入語意,如下列範例所示:

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

設定 Kafka 結構化串流寫入器

重要

Databricks Runtime 13.3 LTS 和更新版本包含一個較新版本的 kafka-clients 程式庫,該程式庫預設啟用等冪寫入。 如果 Kafka 接收器使用 2.8.0 或更低版本,並且已設定 ACL 但未啟用 IDEMPOTENT_WRITE,則寫入將會失敗並出現錯誤訊息 org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state

透過升級至 Kafka 2.8.0 或更新版本或者在設定結構化串流寫入器時設定 .option(“kafka.enable.idempotence”, “false”),來解決此錯誤。

提供給 DataStreamWriter 的結構描述與 Kafka 接收器互動。 您可以使用下列欄位:

資料行名稱 必要或選用 類型
key 選用 STRINGBINARY
value 必要 STRINGBINARY
headers 選用 ARRAY
topic 選用 (如果將 topic 設定為寫入器選項,則略過) STRING
partition 選用 INT

以下是寫入 Kafka 時設定的常見選項:

選項 預設值 說明
kafka.boostrap.servers <host:port> 的逗號分隔清單 none [必要] Kafka bootstrap.servers 組態。
topic STRING 未設定 [選用] 設定要寫入的所有資料列的主題。 此選項會覆寫資料中存在的任何主題資料行。
includeHeaders BOOLEAN false [選用] 是否要在資料列中包含 Kafka 標頭。

如需其他選用組態,請參閱結構化串流 Kafka 整合指南

擷取 Kafka 計量

您可以取得串流查詢在所有訂閱主題中最新可用位移之後使用的位移的平均值、最小值和最大值,即 avgOffsetsBehindLatestmaxOffsetsBehindLatestminOffsetsBehindLatest 計量。 請參閱以互動方式讀取計量

注意

在 Databricks Runtime 9.1 和更新版本中可用。

透過檢查 estimatedTotalBytesBehindLatest 的值,取得查詢流程尚未從訂閱主題取用的估計總位元組數。 此估計值基於在過去 300 秒內處理的批次。 您可以透過將選項 bytesEstimateWindowLength 設定為其他值,來變更估計值所基於的時間範圍。 例如,若要將其設定為 10 分鐘,請執行以下命令:

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

如果在筆記本中執行串流,您可以在串流查詢進度儀表板中的 [原始資料] 索引標籤下查看這些計量:

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

使用 SSL 將 Azure Databricks 連線至 Kafka

若要啟用 Kafka 的 SSL 連線,請遵循 Confluent 文件使用 SSL 進行加密和驗證中的指示。 您可以提供此處所述的組態 (以 kafka. 為首碼) 作為選項。 例如,您可以在屬性 kafka.ssl.truststore.location 中指定信任存放區位置。

Databricks 建議您:

  • 將您的憑證儲存在雲端物件儲存體中。 您可以將對憑證的存取權限制為僅可以存取 Kafka 的叢集。 請參閱使用 Unity 目錄進行資料控管。
  • 將憑證密碼儲存為祕密範圍中的祕密

下列範例使用物件儲存位置和 Databricks 祕密來啟用 SSL 連線:

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

將 HDInsight 上的 Kafka 連線至 Azure Databricks

  1. 建立 HDInsight Kafka 叢集。

    如需相關指示,請參閱透過 Azure 虛擬網路連線至 HDInsight 上的 Kafka

  2. 設定 Kafka 訊息代理程式來公告正確的位址。

    請遵循設定 Kafka 進行 IP 公告中的指示。 如果在 Azure 虛擬機器上自行管理 Kafka,請確保將訊息代理程式的 advertised.listeners 組態設定為主機的內部 IP。

  3. 建立 Azure Databricks 叢集。

  4. 將 Kafka 叢集對等互連至 Azure Databricks 叢集。

    請遵循對等互連虛擬網路中的指示。

使用 Microsoft Entra ID 和 Azure 事件中樞進行服務主體驗證

Azure Databricks 支援使用事件中樞服務對 Spark 作業進行驗證。 此驗證是透過具有 Microsoft Entra ID 的 OAuth 來完成。

AAD 驗證圖表

Azure Databricks 支援在下列計算環境中使用用戶端識別碼和祕密進行 Microsoft Entra ID 驗證:

  • 設定了單一使用者存取模式的計算上的 Databricks Runtime 12.2 LTS 和更新版本。
  • 設定了共用存取模式的計算上的 Databricks Runtime 14.3 LTS 和更新版本。
  • 未設定 Unity Catalog 的差異即時資料表管線。

AAzure Databricks 不支援在任何計算環境中或在設定了 Unity Catalog 的差異即時資料表管線中使用憑證進行 Microsoft Entra ID 驗證。

此驗證不適用於共用叢集或 Unity Catalog 差異即時資料表。

設定結構化串流 Kafka 連接器

若要使用 Microsoft Entra ID 執行驗證,您需要下列值:

  • 租用戶識別碼。 您可以在 [Microsoft Entra ID] 服務索引標籤中找到此項。

  • clientID (也稱為應用程式識別碼)。

  • 用戶端密碼。 在您具有此密碼後,應將其作為祕密新增至 Databricks 工作區。 若要新增此祕密,請參閱祕密管理

  • EventHubs 主題。 您可以在特定 [事件中樞命名空間] 頁面上 [實體] 部分下的 [事件中樞] 部分找到主題清單。 若要使用多個主題,您可以在事件中樞層級設定 IAM 角色。

  • EventHubs 伺服器。 您可以在特定 [事件中樞命名空間] 的概觀頁面上找到此項:

    事件中樞命名空間

此外,若要使用 Entra ID,我們需要告知 Kafka 使用 OAuth SASL 機制 (SASL 是一般通訊協定,而 OAuth 是 SASL「機制」的一種類型):

  • kafka.security.protocol 應為 SASL_SSL
  • kafka.sasl.mechanism 應為 OAUTHBEARER
  • kafka.sasl.login.callback.handler.class 應是 Java 類別的完整名稱,值為 kafkashaded 表示遮蔽 Kafka 類別的登入回撥處理常式。 如需確切的類別,請參閱下列範例。

範例

接下來,讓我們看看執行中的範例:

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,

// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

處理潛在錯誤

  • 不支援串流選項。

    如果您嘗試在設定了 Unity Catalog 的差異即時資料表管線中使用此驗證機制,則可能會收到下列錯誤:

    不支援的串流錯誤

    若要解決此錯誤,請使用支援的計算組態。 請參閱使用 Microsoft Entra ID 和 Azure 事件中樞進行服務主體驗證

  • 無法建立新的 KafkaAdminClient

    此為內部錯誤,在下列任何驗證選項不正確時 Kafka 都會擲回此錯誤:

    • 用戶端識別碼 (也稱為應用程式識別碼)
    • 租用戶識別碼
    • EventHubs 伺服器

    若要解決此錯誤,請驗證這些選項的值是否正確。

    此外,如果您修改範例中預設提供的組態選項 (系統要求您不修改),例如 kafka.security.protocol,您可能會看到此錯誤。

  • 未傳回任何記錄

    如果嘗試顯示或處理 DataFrame 但未取得結果,您將在 UI 中看到下列內容。

    無結果訊息

    此訊息表示驗證成功,但 EventHubs 未傳回任何資料。 一些可能的 (儘管並不詳盡) 原因包括:

    • 指定的 EventHubs 主題錯誤。
    • startingOffsets 的預設 Kafka 組態選項是 latest,而且目前未透過主題收到任何資料。 您可以設定 startingOffsetstoearliest 以從 Kafka 的最早位移開始讀取資料。