次の方法で共有


Python 用 Azure EventHubs Checkpoint Store クライアント ライブラリ - バージョン 1.1.4

ストレージ BLOB の使用

Azure EventHubs チェックポイント ストアは、Azure Event Hubsからのイベントの処理中にチェックポイントを格納するために使用されます。 このチェックポイント ストア パッケージは、 への EventHubConsumerClientプラグイン パッケージとして機能します。 チェックポイントとパーティションの所有権情報を維持するための永続的なストアとして Azure Storage BLOB が使用されます。

これは非同期ライブラリであることに注意してください。Azure EventHubs Checkpoint Store クライアント ライブラリの同期バージョンについては、 azure-eventhub-checkpointstoreblob を参照してください。

ソースコード | パッケージ (PyPi) | API リファレンス ドキュメント | Azure Eventhubs のドキュメント | Azure Storage のドキュメント

作業の開始

前提条件

  • Python 3.6 以降。

  • Microsoft Azure サブスクリプション:Azure Event Hubsを含む Azure サービスを使用するには、サブスクリプションが必要です。 既存の Azure アカウントをお持ちでない場合は、無料試用版にサインアップするか、 アカウントの作成時に MSDN サブスクライバー特典を使用できます。

  • Event Hubs 名前空間とイベント ハブ:Azure Event Hubsと対話するには、名前空間と Event Hub も使用できる必要があります。 Azure リソースの作成に慣れていない場合は、Azure portalを使用して Event Hub を作成するためのステップバイステップ ガイドに従ってください。 ここでは、Azure CLI、Azure PowerShell、または Azure Resource Manager (ARM) テンプレートを使用してイベント ハブを作成するための詳細な手順も確認できます。

  • Azure Storage アカウント:Azure Storage アカウントを用意し、blob を含むチェックポイント データを格納するAzure Blob Storageブロック コンテナーを作成する必要があります。 Azure ブロック BLOB ストレージ アカウントの作成に関するガイドに従ってください。

パッケージをインストールする

$ pip install azure-eventhub-checkpointstoreblob-aio

主要な概念

チェックポイント機能

"チェックポイント処理" とは、リーダーがパーティションにおけるイベント シーケンス内の位置をマークまたはコミットするために使用する処理です。 チェックポイント処理はコンシューマーの責任で行います。この処理はコンシューマー グループ内でパーティションごとに発生します。 つまり、コンシューマー グループごとに、各パーティション リーダーは、イベント ストリーム内でのその現在の位置を追跡する必要があり、データ ストリームが完了したと見なしたときにサービスに通知することができます。 リーダーがパーティションから切断し、その後再び接続すると、該当するコンシューマー グループ内の該当するパーティションの最後のリーダーによって最後に送信されたチェックポイントから読み取りが開始されます。 リーダーは接続の際に、このオフセットをイベント ハブに渡して、読み取りを開始する場所を指定します。 このように、チェックポイント処理を使用することで、ダウンストリーム アプリケーションごとにイベントに "完了" のマークを付けると共に、異なるコンピューター上で実行中のリーダー間でフェールオーバーが発生した場合に回復性をもたらすことができます。 このチェックポイント処理で、より小さなオフセットを指定すると、古いデータに戻ることができます。 このメカニズムにより、チェックポイント処理ではフェールオーバーの回復性とイベント ストリームの再生の両方を実現できます。

シーケンス番号の & オフセット

どちらのオフセット & シーケンス番号も、パーティション内のイベントの位置を参照します。 クライアント側のカーソルと考えることができます。 オフセットはイベントのバイト位置です。 オフセット/シーケンス番号を使用すると、イベント コンシューマー (リーダー) は、イベントの読み取りを開始するイベント ストリーム内のポイントを指定できます。 指定されたタイムスタンプの後にのみエンキューされたイベントを受け取るタイムスタンプを指定できます。 Event Hubs サービスの外部で独自のオフセット値を格納する場合は、コンシューマーの責任で行います。 パーティション内では、各イベントにはオフセット、シーケンス番号、およびエンキューされたときのタイムスタンプが含まれます。

EventHubConsumerClient を作成します

を作成 EventHubConsumerClient する最も簡単な方法は、接続文字列を使用する方法です。

from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")

を作成する EventHubConsumerClientその他の方法については、 EventHubs ライブラリ に関するページを参照してください。

を使用してイベントを使用して BlobCheckpointStore チェックポイントを実行する

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'

async def on_event(partition_context, event):
    # Put your code here.
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str,
        container_name
    )
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,
    )

    async with client:
        await client.receive(on_event)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

別のバージョンの Azure Storage Service API でを使用 BlobCheckpointStore する

一部の環境では、Azure Storage Service API のバージョンが異なります。 BlobCheckpointStore 既定では、Storage Service API バージョン 2019-07-07 が使用されます。 別のバージョンに対して使用するには、オブジェクトの作成時に をBlobCheckpointStore指定api_versionします。

トラブルシューティング

全般

ログ記録を有効にすると、トラブルシューティングを行うのに役立ちます。

ログの記録

  • ロガーを有効 azure.eventhub.extensions.checkpointstoreblobaio にして、ライブラリからトレースを収集します。
  • ロガーを有効 azure.eventhub にして、メインの azure-eventhub ライブラリからトレースを収集します。
  • ロガーを有効 azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage にして、Azure Storage BLOB ライブラリからトレースを収集します。
  • 基になる uAMQP ライブラリからトレースを収集するためのロガーを有効にします uamqp
  • クライアントの作成時に を設定 logging_enable=True して、AMQP フレーム レベルのトレースを有効にします。

次のステップ

その他のサンプル コード

EventHubs Checkpoint Store の非同期サンプルの使用を開始します。

ドキュメント

リファレンス ドキュメントについては、 こちらを参照してください

フィードバックの提供

バグが発生した場合、または提案がある場合は、プロジェクトの [問題 ] セクションに問題を提出してください。

共同作成

このプロジェクトでは、共同作成と提案を歓迎しています。 ほとんどの共同作成では、共同作成者使用許諾契約書 (CLA) にご同意いただき、ご自身の共同作成内容を使用する権利を Microsoft に供与する権利をお持ちであり、かつ実際に供与することを宣言していただく必要があります。 詳細については、 https://cla.microsoft.com を参照してください。

pull request を送信すると、CLA を提供して PR (ラベル、コメントなど) を適宜装飾する必要があるかどうかを CLA ボットが自動的に決定します。 ボットによって提供される手順にそのまま従ってください。 この操作は、Microsoft の CLA を使用するすべてのリポジトリについて、1 回だけ行う必要があります。

このプロジェクトでは、Microsoft オープン ソースの倫理規定を採用しています。 詳しくは、「Code of Conduct FAQ (倫理規定についてよくある質問)」を参照するか、opencode@microsoft.com 宛てに質問またはコメントをお送りください。

インプレッション数