次の方法で共有


Azure Event Hubs 用の Kafka Streams

この記事では、Azure Event Hubs で Kafka Streams クライアント ライブラリを使う方法について詳しく説明します。

Note

Kafka Streams 機能は、Event Hubs の Premium と Dedicated レベルでのみ、パブリック プレビューで使用できます。

概要

Apache Kafka Streams は Java 専用のクライアント ライブラリであり、Kafka トピックに格納されているデータに対するストリーミング データの処理とリアルタイム アプリケーションの構築のためのフレームワークを提供します。 すべての処理のスコープはクライアントですが、Kafka トピックは、出力が宛先トピックに書き込まれる前に、中間データのデータ ストアとして機能します。

Event Hubs は、ユーザー独自の Kafka クラスターを実行する代わりとして、既存の Kafka クライアント アプリケーションで使われる Kafka エンドポイントを提供します。 Event Hubs は、既存の Kafka アプリケーションの多くで動作します。 詳細については、Apache Kafka 用の Event Hubs に関するページを参照してください。

Azure Event Hubs での Kafka Streams の使用

Azure Event Hubs では、AMQP と Kafka の両方のプロトコルがネイティブにサポートされます。 ただし、Kafka Streams の動作の互換性を保証するには、Kafka クライアントで既定の構成パラメーターの一部を更新する必要があります。

プロパティ Event Hubs の既定の動作 Kafka Streams 用に変更された動作 説明
messageTimestampType AppendTime に設定されます CreateTime に設定する必要があります Kafka Streams は、追加タイムスタンプではなく作成タイムスタンプに依存します
message.timestamp.difference.max.ms 最大許容値は 90 日です プロパティは、過去のタイムスタンプの管理のみに使われます。 将来の時間は 1 時間に設定され、変更できません。 これは Kafka プロトコルの仕様に従っています
min.compaction.lag.ms 最大許容値は 2 日です
無期限保持トピック トピック パーティションごとに、250 GB のサイズに基づく切り詰め
無期限保持トピックのレコード削除 API 実装されていません。 回避策として、トピックを更新し、有限保持時間を設定できます。 これは GA で行われます

その他の考慮事項

留意すべき他の考慮事項を次に示します。

  • Kafka ストリーム クライアント アプリケーションは、ストリーム処理用の一時的なトピックを作成できるように、名前空間全体に対する管理、読み取り、書き込みのアクセス許可を付与される必要があります。
  • 一時的なトピックとパーティションは、特定の名前空間のクォータに加算されます。 名前空間またはクラスターをプロビジョニングするときは、これらを考慮する必要があります。
  • "オフセット" ストアの無期限保持時間は、SKU の最大メッセージ保持時間によって制限されます。 これらのレベルの具体的な値については、Event Hubs のクォータに関する記事をご覧ください。

これには、AppendTime (つまり、ログ追加時刻) の代わりに CreateTime (つまり、イベント作成時刻) を使うように、messageTimestampType のトピック構成を更新することが含まれます。

既定の動作 (必須) をオーバーライドするには、Azure Resource Manager (ARM) で次のように設定する必要があります。

Note

更新する必要がある構成を強調するため、ARM テンプレートの特定の部分のみを示してあります。

{
  "parameters": {
    "namespaceName": "contoso-test-namespace",
    "resourceGroupName": "contoso-resource-group",
    "eventHubName": "contoso-event-hub-kafka-streams-test",
    ...
    "parameters": {
      "properties": {
        ...
        "messageTimestampType": "CreateTime",
        "retentionDescription": {
          "cleanupPolicy": "Delete",
          "retentionTimeInHours": -1,
          "tombstoneRetentionTimeInHours": 1
        }
      }
    }
  }
}

Kafka Streams の概念

Kafka Streams は、開発者がリアルタイム ストリーミング シナリオをより短時間で始められるように、Kafka プロデューサーとコンシューマーの API に対する単純な抽象化レイヤーを提供します。 この軽量ライブラリは、内部のメッセージング レイヤーを Apache Kafka 互換ブローカー (Azure Event Hubs など) に依存し、フォールト トレラントなローカル状態ストアを管理します。 トランザクション API を使って、Kafka Streams ライブラリは、1 回だけの処理や、一度に 1 レコードの処理などの、豊富な処理機能をサポートします。

順不同で到着するレコードには、イベント時刻ベースのウィンドウ化操作によるメリットがあります。

Note

Kafka Streams のドキュメントKafka Streams の主要な概念をよく理解することをお勧めします。

ストリーム

ストリームは、Kafka トピックの抽象化された表現です。 これは、制限なしで連続的に更新される不変データ レコードのデータ セットで構成され、各データ レコードはキーと値のペアです。

ストリーム処理のトポロジ

Kafka Streams アプリケーションでは、プロセッサのトポロジによって表される DAG (有向非巡回グラフ) を使って計算ロジックが定義されています。 プロセッサのトポロジは、処理ステップを表すストリーム プロセッサ (トポロジ内のノード) が、ストリーム (トポロジ内のエッジ) によって接続されたもので構成されます。

特定の特殊なケースを除き、ストリーム プロセッサはアップストリーム プロセッサまたはダウンストリーム プロセッサにチェーンできます。

  • ソース プロセッサ: これらのプロセッサはアップストリーム プロセッサを持たず、1 つ以上のストリームから直接読み取ります。 その後、ダウンストリーム プロセッサにチェーンできます。
  • シンク プロセッサ: これらのプロセッサはダウンストリーム プロセッサを持たず、ストリームに直接書き込む必要があります。

ストリーム処理トポロジは、Kafka Streams DSL または下位レベルの Processor API を使って定義できます。

ストリームとテーブルの二重性

ストリームとテーブルは、Kafka Streams DSL によって提供される、2 つの異なってはいても有用な抽象化であり、ストリーム処理のユース ケースに共存する必要がある時系列とリレーショナル両方のデータ形式をモデル化します。

Kafka はこれをさらに拡張し、ストリームとテーブルの間に次のような二重性を導入します

  • ストリームは、テーブルの変更ログと見なすことができます。
  • テーブルは、ストリーム内の各キーの最新の値のスナップショットと見なすことができます。

この二重性により、ユース ケースで必要に応じて、テーブルとストリームを入れ替えて使用できます。

次に例を示します。

  • 静的顧客データ (テーブルとしてモデル化) と、動的トランザクション (ストリームとしてモデル化) の結合
  • 1 日のトレーダー ポートフォリオでの変化するポートフォリオ ポジション (ストリームとしてモデル化) と、最新の市場データ フィード (ストリームとしてモデル化) の結合。

時刻

Kafka Streams を使うと、ウィンドウ化関数と猶予関数は、順不順でデータ レコードを取り込みながら、処理に引き続き含めることができます。 この動作を決定論的にするため、Kafka Streams には時間に関する追加の概念があります。 これには以下が含まれます。

  • 作成時刻 ('イベント時刻' とも呼ばれます): イベントが発生してデータ レコードが作成された時刻です。
  • 処理時刻: これは、データ レコードがストリーム処理アプリケーションによって処理された (または使用された) 時刻です。
  • 追加時刻 ('作成時刻' とも呼ばれます): これは、データが格納され、Kafka ブローカーのストレージにコミットされた時刻です。 これは、イベントの作成とブローカーによる実際のインジェストの間の時間差のため、作成時刻とは異なります。

ステートフルな操作

状態管理により、異なるストリームからのデータの結合や集計などの高度なストリーム処理アプリケーションが可能になります。 これは、Kafka Streams によって提供され、Kafka Streams DSL のステートフル演算子を使ってアクセスされる、状態ストアで実現されます。

DSL のステートフル変換には、次のものがあります。

ウィンドウと猶予

Kafka Streams DSL でのウィンドウ化操作を使うと、開発者は、集計や結合などのステートフルな操作のために、特定のキーでレコードをグループ化する方法を制御できます。

ウィンドウ化操作では、猶予期間を指定し、特定のウィンドウで順序が正しくないレコードに対してある程度の柔軟性を持たせることもできます。 特定のウィンドウを対象としたレコードが、特定のウィンドウの後で到着した場合でも、猶予期間内であれば受け付けられます。 猶予期間の終了後に到着したレコードは破棄されます。

アプリケーションでは、ウィンドウ化と猶予期間の制御を利用して、順序が正しくないレコードに対するフォールト トレランスを向上させる必要があります。 適切な値はワークロードによって異なり、実験を通じて明らかにする必要があります。

処理の保証

ビジネスユーザーと技術ユーザーは、ストリーム処理ワークロードの出力から重要なビジネス分析情報を抽出することを目指しており、これは高いトランザクション保証要件につながります。 Kafka Streams は、Kafka のトランザクションと連携し、Kafka 互換ブローカー (Azure Event Hubs など) の基になるストレージ システムと統合して、オフセット コミットと状態ストアの更新がアトミックに書き込まれるようにすることで、トランザクション処理を保証します。

トランザクション処理の保証を実現するには、Kafka Streams 構成の processing.guarantee の設定を、既定値の at_least_once から exactly_once_v2 (Apache Kafka 2.5 以降のクライアント バージョンの場合) または exactly_once (Apache Kafka 2.5.x より前のクライアント バージョンの場合) に更新する必要があります。

次のステップ

この記事では、Kafka 用 Event Hubs の概要について説明しました。 詳細については、Azure Event Hubs 用 Apache Kafka 開発者ガイドを参照してください。

SAS または OAuth を使用してイベント ハブを作成しアクセスする手順のチュートリアルについては、「クイックスタート: Kafka プロトコルを使用した Event Hubs によるデータ ストリーミング」を参照してください。

また、GitHub の OAuth サンプルも参照してください。