次の方法で共有


Apache Kafka と Azure Databricks を使用したストリーム処理

この記事では、Azure Databricks で構造化ストリーミング ワークロードを実行するときに、Apache Kafka をソースまたはシンクとして使用する方法について説明します。

Kafka の詳細については、Kafka のドキュメントを参照してください。

Kafka からデータを読み取る

Kafka から読み取られたストリーミングの例を次に示します。

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

Azure Databricks では、次の例に示すように、Kafka データ ソースのバッチ読み取りセマンティクスもサポートされています。

df = (spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
)

増分バッチ読み込みの場合、Databricks では、Trigger.AvailableNow で Kafka を使用することをお勧めします。 「増分バッチ処理の構成」を参照してください。

Databricks Runtime 13.3 LTS 以降では、Azure Databricks には Kafka データを読み取るための SQL 関数が用意されています。 SQL を使用したストリーミングは、Delta Live Tables または Databricks SQL のストリーミング tables でのみサポートされます。 read_kafka table-valued 関数のを参照してください。

Kafka 構造化ストリーミング リーダーを構成する

Azure Databricks では、Kafka 0.10 以降に connections を構成するためのデータ形式として kafka キーワードが提供されます。

Kafka の最も一般的な構成を次に示します。

サブスクライブするトピックを指定するには、複数の方法があります。 次のいずれかの parametersのみを指定する必要があります。

オプション 説明
subscribe トピックのコンマ区切りlist。 サブスクライブするトピックのlist。
subscribePattern Java 正規表現文字列。 トピックのサブスクライブに使用するパターン。
assign JSON 文字列 {"topicA":[0,1],"topic":[2,4]} 使用する特定の topicPartition。

その他の注目すべき構成:

オプション Default value 説明
kafka.bootstrap.servers "ホスト:ポート" のコンマ区切りlist。 empty [必須] Kafka bootstrap.servers の構成。 Kafka からのデータがない場合は、最初にブローカー アドレス list 確認します。 ブローカー アドレス list が正しくない場合は、エラーが発生しない可能性があります。 これは、Kafka クライアントによって、ブローカーは最終的に利用可能になると想定され、ネットワーク エラーが発生した場合は永久に再試行されるためです。
failOnDataLoss true または false true [省略可能] データが失われる恐れがある場合にクエリを失敗させるかどうか。 クエリは、トピックの削除、処理前のトピックの切り捨てなど、多くのシナリオが原因で、Kafka からのデータの読み取りに永続的に失敗する可能性があります。 データが失われた可能性があるかどうかについて控えめな推定を試みます。 これにより、誤ってアラームが発生する場合があります。 オプションを Set に設定し、期待どおりに機能しない場合や、データが失われたにもかかわらず処理を続行したい場合は false に設定します。
minPartitions >= 0 の整数、0 = 無効。 0 (無効) [省略可能] Kafka から読み取るパーティションの最小数。 minPartitions オプションを使用して、Kafka から読み取るパーティションの最小数に任意の数を指定して Spark を構成することができます。 通常、Spark では、Kafka の topicPartitions と、Kafka から使用する Spark パーティションが 1 対 1 でマッピングされます。 minPartitions オプションを Kafka topicPartitions より大きい値に set すると、Spark は大きな Kafka パーティションをより小さな部分に分割します。 このオプションは、ピーク時の負荷やデータスキューが発生した場合、ストリームが遅れている際に、処理速度を向上させるためにsetとして設定できます。 トリガーごとに Kafka コンシューマーを初期化すると、Kafka に接続するときに SSL を使用する場合、パフォーマンスに影響を与える可能性があります。
kafka.group.id Kafka コンシューマー グループ ID。 set ではない [省略可能] Kafka から読み取り中に使用するグループ ID。 注意して使用する必要があります。 既定では、各クエリによって、データを読み取るための一意のグループ ID が生成されます。 これにより、各クエリでは、独自のコンシューマー グループが使用され、他のコンシューマーによる干渉を受けないため、サブスクライブされたトピックのすべてのパーティションを確実に読み取ることができます。 一部のシナリオ (たとえば、Kafka グループ ベースの承認) では、データを読み取るために特定の承認済みグループ ID を使用することが必要な場合があります。 必要に応じて、グループ ID を set できます。 ただし、予期しない動作が発生する可能性があるため、細心の注意を払って設定してください。

- クエリ (バッチとストリーミングの両方) を同じグループ ID で同時実行すると、相互に干渉し合い、各クエリでデータの一部しか読み取れなくなる可能性があります。
- これは、クエリを立て続けに起動または再起動した場合にも発生する可能性があります。 このような問題を最小限に抑えるには、Kafka コンシューマー構成 session.timeout.ms が非常に小さくなるようにsetします。
startingOffsets earliest、latest latest [省略可能] クエリが開始されたときの開始点。最も古いオフセットの "earliest" か、各 TopicPartition の開始offsetを指定する JSON 文字列。 JSON では、offsetとして -2 を使用して earliest を、-1 で latest を示すことができます。 注: バッチ クエリの場合、latest (暗黙的に、または JSON で -1 を使用して) は許可されません。 ストリーミング クエリの場合、これは新しいクエリが開始されたときにのみ適用され、再開は常にクエリが中断された where から取得されます。 クエリ中に新しく検出されたパーティションは、earliest で開始されます。

他の省略可能な構成については、「構造化ストリーミング + Kafka 統合ガイド」を参照してください。

Kafka レコードのSchema

Kafka レコードの schema は以下の通りです。

Column Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

keyvalue は、ByteArrayDeserializer を使用して常にバイト配列として逆シリアル化されます。 DataFrame 操作 (cast("string")など) を使用して、キーと valuesを明示的に逆シリアル化します。

Kafka にデータを書き込む

Kafka へのストリーミング書き込みの例を次に示します。

(df
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .start()
)

Azure Databricks では、次の例に示すように、Kafka データ シンクへのバッチ書き込みセマンティクスもサポートされています。

(df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("topic", "<topic>")
  .save()
)

Kafka 構造化ストリーミング ライターを構成する

重要

Databricks Runtime 13.3 LTS 以降には、既定でべき等の書き込みを有効にする新しいバージョンの kafka-clients ライブラリが含まれています。 Kafka シンクでバージョン 2.8.0 以下を使用していて、ACL が構成されているが IDEMPOTENT_WRITE が有効になっていない場合、書き込みはエラー メッセージ org.apache.kafka.common.KafkaException:Cannot execute transactional method because we are in an error state で失敗します。

このエラーを解決するには、Kafka バージョン 2.8.0 以降にアップグレードするか、構造化ストリーミング ライターの構成時に .option(“kafka.enable.idempotence”, “false”) を設定します。

DataStreamWriter に提供される schema は、Kafka シンクと対話します。 次のフィールドを使用できます。

Columnの名前 必須または省略可能 Type
key 省略可能 STRING または BINARY
value 必須 STRING または BINARY
headers 省略可能 ARRAY
topic 省略可能(topic がライターオプションで set の場合は無視されます) STRING
partition 省略可能 INT

Kafka への書き込み中にsetされる一般的なオプションを次に示します。

オプション Default value 説明
kafka.boostrap.servers <host:port> のコンマ区切りlist。 なし [必須] Kafka bootstrap.servers の構成。
topic STRING set ではない [省略可能] すべての行のトピックが書き込まれるように設定します。 このオプションは、データに存在するすべてのトピック column をオーバーライドします。
includeHeaders BOOLEAN false [省略可能] 行に Kafka ヘッダーを含めるかどうか。

他の省略可能な構成については、「構造化ストリーミング + Kafka 統合ガイド」を参照してください。

Kafka メトリックを取得する

avgOffsetsBehindLatestmaxOffsetsBehindLatestminOffsetsBehindLatest メトリックを使って、サブスクライブされたすべてのトピックの中で、ストリーミング クエリが利用可能な最新のoffsetより後であるオフセットの平均、最小、最大数をgetできます。 「対話形式によるメトリックの読み取り」を参照してください。

注意

Databricks Runtime 9.1 以降で使用できます。

estimatedTotalBytesBehindLatest の値を調べて、サブスクライブされたトピックからクエリ プロセスによって消費されていない推定合計バイト数をGetします。 この推定値は、過去 300 秒間に処理されたバッチ数に基づきます。 推定値の基になる期間は、オプション bytesEstimateWindowLength を別の値に設定することによって変更できます。 たとえば、10 分に set 設定するには、次のようにします。

df = (spark.readStream
  .format("kafka")
  .option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)

ノートブックでストリームを実行している場合、これらのメトリックは、ストリーミング クエリの進行状況ダッシュボードの [生データ] タブに表示されます。

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

SSL を使用して Azure Databricks を Kafka に接続する

Kafka に対して SSL を有効にするには、Sslを使用した暗号化と認証 Confluent のドキュメントの指示に従います。 オプションとして、そこで説明されている構成を、プレフィックス kafka. を付けて指定できます。 たとえば、プロパティ kafka.ssl.truststore.location で、信頼ストアの場所を指定します。

Databricks では、次のことが推奨されています。

  • 証明書をクラウド オブジェクト ストレージに格納します。 証明書へのアクセスは、Kafka にアクセスできるクラスターのみに制限できます。 Unity を使用した データ ガバナンスを参照してください。
  • 証明書のパスワードをシークレットとしてシークレット スコープに格納します。

次の例では、オブジェクト ストレージの場所と Databricks シークレットを使用して SSL 接続を有効にします。

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", ...)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", <truststore-location>)
  .option("kafka.ssl.keystore.location", <keystore-location>)
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)

HDInsight 上の Kafka を Azure Databricks に接続する

  1. HDInsight Kafka クラスターを作成します。

    手順については、「Azure Virtual Network 経由で HDInsight 上の Apache Kafka に接続する」を参照してください。

  2. 正しいアドレスをアドバタイズするように Kafka ブローカーを構成します。

    IP をアドバタイズするように Kafka を構成する」の手順に従います。 Azure Virtual Machines で Kafka を自分で運用する場合は、ブローカーの advertised.listeners 構成がホストの内部 IP に set 設定されていることを確認します。

  3. Azure Databricks クラスターを作成します。

  4. Kafka クラスターを Azure Databricks クラスターにピアリングします。

    仮想ネットワークをピアリングする」の手順に従います。

Microsoft Entra ID と Azure Event Hubs を使用したサービス プリンシパル認証

Azure Databricks は、Event Hubs サービスを使用した Spark ジョブの認証をサポートしています。 認証は Microsoft Entra ID によって OAuth で経由で行われます。

AAD 認証のダイアグラム

Azure Databricks では、以下のコンピューティング環境で、クライアント ID とシークレットを使用した Microsoft Entra ID 認証をサポートしています。

  • シングル ユーザー アクセス モードで構成されたコンピューティング上の Databricks Runtime 12.2 LTS 以降。
  • 共有アクセス モードで構成されたコンピューティング上の Databricks Runtime 14.3 LTS 以降。
  • Unity Catalogなしで構成された Delta Live Tables パイプライン。

Azure Databricks では、どのコンピューティング環境でも、Unity Catalogで構成された Delta Live Tables パイプラインでも、証明書を使用した Microsoft Entra ID 認証はサポートされていません。

この認証は、共有クラスターまたは Unity Catalog Delta Live Tablesでは機能しません。

構造化ストリーミング Kafka コネクタの構成

Microsoft Entra ID で認証を実行するには、次の valuesが必要です。

  • テナント ID。 これは、Microsoft Entra ID の [サービス] タブにあります。

  • clientID (アプリケーション ID とも呼ばれます)。

  • クライアント シークレット。 これを取得したら、シークレットとして Databricks ワークスペースに追加する必要があります。 このシークレットを追加するには、「シークレットの管理」を参照してください。

  • EventHubs トピック。 トピックのlistは、特定の Event Hubs 名前空間ページの [エンティティ] セクションの [Event Hubs] セクションにあります。 複数のトピックを操作するには、Event Hubs レベルで IAM ロールを set できます。

  • EventHubs サーバー。 これは、特定の Event Hubs 名前空間の概要ページにあります。

    Event Hubs 名前空間

さらに、Entra ID を使用するには、OAuth SASL メカニズム (SASL は汎用プロトコルであり、OAuth は SASL "メカニズム" の一種です) を使用するように Kafka に指示する必要があります。

  • kafka.security.protocol は、SASL_SSL である必要があります。
  • kafka.sasl.mechanism は、OAUTHBEARER である必要があります。
  • kafka.sasl.login.callback.handler.class は、シェーディングされた Kafka クラスのログイン コールバック ハンドラーについて値 kafkashaded を持つ Java クラスの完全修飾名にする必要があります。 正確なクラスについては、次の例を参照してください。

次に、実行例を見てみましょう。

Python

# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")

event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------

sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'

kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,

# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}

df = spark.readStream.format("kafka").options(**kafka_options)

display(df)

Scala

// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")

val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------

val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""

val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,

// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)

val scalaDF = spark.readStream
  .format("kafka")
  .options(kafkaOptions)
  .load()

display(scalaDF)

潜在的なエラーの処理

  • ストリーミング オプションがサポートされません。

    Unity Catalog で構成された Delta Live Tables パイプラインでこの認証メカニズムを使用しようとすると、次のエラーが表示されることがあります。

    サポートされていないストリーミング エラー

    このエラーを解決するには、サポートされているコンピューティング構成を使用します。 「Microsoft Entra ID と Azure Event Hubs を使用したサービス プリンシパル認証」を参照してください。

  • 新しい KafkaAdminClient の作成に失敗しました。

    これは、次のいずれかの認証オプションが正しくない場合に Kafka がスローする内部エラーです。

    • クライアント ID (アプリケーション ID とも呼ばれます)
    • テナント ID
    • Event Hubs サーバー

    エラーを解決するには、これらのオプションの values が正しいことを確認します。

    また、この例で既定で提供されている (変更しないように求められた) kafka.security.protocol などの構成オプションを変更すると、このエラーが表示される場合があります。

  • 返されるレコードがありません

    データ フレームを表示または処理しようとしているが結果が得られない場合は、UI に次の情報が表示されます。

    結果メッセージなし

    このメッセージは、認証は成功したが、Event Hubs がデータを返さなかったことを意味します。 次のような理由が考えられます (ただし、決して網羅的ではありません)。

    • 正しくない EventHubs トピックを指定しました。
    • startingOffsets の既定の Kafka 構成オプションが latest であり、現在、トピック経由でデータをまだ受信していません。 setstartingOffsetstoearliest を使用して Kafka の最も古いオフセットからデータの読み取りを開始できます。