Azure Event Hubs 用 Apache Kafka のトランザクション
この記事では、Azure Event Hubs で Apache Kafka トランザクション API を使用する方法について詳しく説明します。
概要
Event Hubs は、独自の Kafka クラスターを実行する代わりに、既存の Kafka クライアント アプリケーションが使用できる Kafka エンドポイントを提供します。 Event Hubs は、既存の Kafka アプリケーションの多くで動作します。 詳細については、Apache Kafka 用の Event Hubs に関するページを参照してください。
このドキュメントでは、Kafka のトランザクション API を Azure Event Hubs でシームレスに使用する方法に焦点を当てています。
Note
現在、Kafka Transactions は Premium および Dedicated レベルでパブリック プレビュー段階です。
Apache Kafka のトランザクション
クラウド ネイティブ環境では、アプリケーションはネットワークの中断や名前空間の再起動とアップグレードに対して回復性を持たせる必要があります。 厳密な処理の保証を必要とするアプリケーションは、アプリケーションとデータの状態が確実に管理されるように、トランザクション フレームワークまたは API を利用して、すべての操作が実行されるか、まったく実行されないことを保証する必要があります。 一連の操作が失敗した場合は、適切な処理が保証されるように、確実にアトミックに再試行することができます。
Note
通常、トランザクションの保証は、"すべてか無か" の方法で処理する必要がある複数の操作がある場合に必要です。
他のすべての操作については、クライアント アプリケーションは、特定の操作が失敗した場合に、エクスポネンシャル バックオフを使用して操作を再試行するために既定で回復性があります。
Apache Kafka には、同じまたは異なるトピックまたはパーティション セット全体に対してこのレベルの処理を保証するトランザクション API が用意されています。
トランザクションは以下の場合に適用されます。
- トランザクション プロデューサー。
- セマンティクスを 1 回だけ処理します。
トランザクション プロデューサー
トランザクション プロデューサーは、異なるトピックにまたがる複数のパーティションにデータがアトミックに書き込まれることを保証します。 プロデューサーはトランザクションを開始し、同じトピックまたは異なるトピックにまたがる複数のパーティションに書き込み、トランザクションをコミットまたは中止できます。
プロデューサーがトランザクションであることを保証するには、enable.idempotence
を true に設定して、データが 1 回だけ書き込まれるようにする必要があります。こうすることで "送信" 側での重複が回避されます。 さらに、プロデューサーを一意に特定するために transaction.id
を設定する必要があります。
producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "transactional-producer-1");
KafkaProducer<String, String> producer = new KafkaProducer(producerProps);
プロデューサーが初期化された後に、以下の呼び出しにより、プロデューサーがトランザクション プロデューサーとしてブローカーに登録されます。
producer.initTransactions();
次に、プロデューサーはトランザクションを明示的に開始し、通常どおり異なるトピックとパーティションにまたがる送信操作を実行してから、以下の呼び出しでトランザクションをコミットする必要があります。
producer.beginTransaction();
/*
Send to multiple topic partitions.
*/
producer.commitTransaction();
障害またはタイムアウトによりトランザクションを中止する必要がある場合、プロデューサーは abortTransaction()
メソッドを呼び出すことができます。
producer.abortTransaction();
1 回のみのセマンティクス
1 回のみのセマンティクスは、プロデューサーのトランザクション スコープにコンシューマーを追加することによってトランザクション プロデューサーに基づいて構築されます。そのため、各レコードの読み取り、処理、書き込みは確実に 1 回のみ行われます。
まず、トランザクション プロデューサーのインスタンスが作成されます。
producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "transactional-producer-1");
KafkaProducer<K, V> producer = new KafkaProducer(producerProps);
producer.initTransactions();
次に、以下のプロパティを設定して、非トランザクション メッセージまたはコミットされたトランザクション メッセージのみを読み取るようにコンシューマーを構成する必要があります。
consumerProps.put("isolation.level", "read_committed");
KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);
コンシューマーのインスタンスが作成されると、レコードを読み取る必要があるトピックをサブスクライブできます。
consumer.subscribe(singleton("inputTopic"));
コンシューマーが入力トピックからレコードをポーリングした後、プロデューサーは、レコードが処理されて出力トピックに書き込まれるトランザクション スコープを開始します。 レコードが書き込まれると、すべてのパーティションのオフセットの更新されたマップが作成されます。 次に、プロデューサーは、トランザクションをコミットする前に、この更新されたオフセット マップをトランザクションに送信します。
例外が発生した場合、トランザクションは中止され、プロデューサーは処理をアトミックに再試行します。
while (true) {
ConsumerRecords records = consumer.poll(Long.Max_VALUE);
producer.beginTransaction();
try {
for (ConsumerRecord record : records) {
/*
Process record as appropriate
*/
// Write to output topic
producer.send(producerRecord(“outputTopic”, record));
}
/*
Generate the offset map to be committed.
*/
Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
for (TopicPartition partition : records.partitions()) {
// Calculate the offset to commit and populate the map.
offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
}
// send offsets to transaction and then commit the transaction.
producer.sendOffsetsToTransaction(offsetsToCommit, group);
producer.commitTransaction();
} catch (Exception e)
{
producer.abortTransaction();
}
}
警告
トランザクションが max.transaction.timeout.ms
より前にコミットまたは中止されなかった場合、トランザクションは Event Hubs によって自動的に中止されます。 既定の max.transaction.timeout.ms
は、Event Hubs によって 15 分に設定されていますが、プロデューサーは、プロデューサー構成プロパティの transaction.timeout.ms
プロパティを設定することで、より低い値にオーバーライドできます。
移行ガイド
Azure Event Hubs で使用したい既存の Kafka アプリケーションがある場合は、迅速に作業を開始できるように、Azure Event Hubs の Kafka 移行ガイドを確認してください。
次のステップ
Event Hubs と Kafka 用 Event Hubs の詳細については、次の記事を参照してください。