次の方法で共有


Python 用Azure Event Hubs クライアント ライブラリ - バージョン 5.11.5

Azure Event Hubsは、1 秒あたり何百万ものイベントを取り込み、複数のコンシューマーにストリーミングできる、拡張性の高い発行サブスクライブ サービスです。 これにより、接続されたデバイスとアプリケーションによって生成される大量のデータを処理して分析できます。 Event Hubs がデータを収集したら、リアルタイム分析プロバイダーを使用するか、バッチ処理/ストレージ アダプターを使用してデータを取得、変換、格納できます。 Azure Event Hubsの詳細については、「Event Hubs とは」を参照してください。

Azure Event Hubs クライアント ライブラリにより、Azure Event Hubs イベントの発行と利用が可能になります。また、次の用途に使われます。

  • ビジネス インテリジェンスと診断の目的で、アプリケーションに関するテレメトリを生成します。
  • アプリケーションの状態に関する事実を公開します。これにより、関係のある当事者が、アクションを起こすトリガーとして監視し使うことができます。
  • ビジネスや他のエコシステム内で発生する興味深い操作と相互作用を観察し、疎結合されたシステムが密接に結びつくことなくやり取りできるようにします。
  • 1 つ以上の発行元からイベントを受信し、エコシステムのニーズに合わせてそれらを変換し、変換されたイベントをコンシューマーが観察できるように新しいストリームに公開します。

ソースコード | パッケージ (PyPi) | パッケージ (Conda) | API リファレンス ドキュメント | 製品ドキュメント | サンプル

作業の開始

前提条件

  • Python 3.7 以降。

  • Microsoft Azure サブスクリプション:Azure Event Hubsを含む Azure サービスを使用するには、サブスクリプションが必要です。 既存の Azure アカウントをお持ちでない場合は、無料試用版にサインアップするか、 アカウントの作成時に MSDN サブスクライバー特典を使用できます。

  • Event Hubs 名前空間とイベント ハブ:Azure Event Hubsと対話するには、名前空間と Event Hub も使用できる必要があります。 Azure リソースの作成に慣れていない場合は、Azure portalを使用して Event Hub を作成するためのステップ バイ ステップ ガイドに従ってください。 ここでは、Azure CLI、Azure PowerShell、または Azure Resource Manager (ARM) テンプレートを使用してイベント ハブを作成するための詳細な手順も確認できます。

パッケージをインストールする

pip を使用して Python 用Azure Event Hubs クライアント ライブラリをインストールします。

$ pip install azure-eventhub

クライアントを認証する

Event Hubs との対話は、EventHubConsumerClient クラスまたは EventHubProducerClient クラスのインスタンスで始まります。 クライアント オブジェクトをインスタンス化するには、ホスト名、SAS/AAD 資格情報、イベント ハブ名、または接続文字列のいずれかが必要です。

接続文字列からクライアントを作成します。

Event Hubs クライアント ライブラリがイベント ハブと対話するには、Event Hubs 名前空間を作成するときに自動的に作成される接続文字列を使用するのが最も簡単な方法です。 Azure の共有アクセス ポリシーに慣れていない場合は、ステップ バイ ステップ ガイドに従って Event Hubs 接続文字列を取得できます。

  • メソッドはfrom_connection_string、フォームEndpoint=sb://<yournamespace>.servicebus.windows.net/;SharedAccessKeyName=<yoursharedaccesskeyname>;SharedAccessKey=<yoursharedaccesskey>とエンティティ名の接続文字列を Event Hub インスタンスに取り込みます。 この接続文字列は、Azure portal から取得できます。

azure-identity ライブラリを使用してクライアントを作成します。

または、Credential オブジェクトを使用して、azure-identity パッケージを使用して AAD 経由で認証することもできます。

  • 上記のリンク先のサンプルで示されているこのコンストラクターは、Event Hub インスタンスのホスト名とエンティティ名と 、TokenCredential プロトコルを実装する資格情報を受け取ります。 azure-identity パッケージで使用できるプロトコルの実装TokenCredentialがあります。 ホスト名の形式 <yournamespace.servicebus.windows.net>は です。
  • によって azure-identity提供される資格情報の種類を使用するには、パッケージをインストールしてください。 pip install azure-identity
  • さらに、非同期 API を使用するには、まず、 などの aiohttp非同期トランスポートをインストールする必要があります。 pip install aiohttp
  • Azure Active Directory を使用する場合は、Azure Event Hubs データ所有者ロールなどの Event Hubs へのアクセスを許可するロールがプリンシパルに割り当てられている必要があります。 Event Hubs で Azure Active Directory 承認を使用する方法の詳細については、 関連するドキュメントを参照してください。

主要な概念

  • EventHubProducerClient は、組み込みデバイス ソリューション、モバイル デバイス アプリケーション、コンソールまたはその他のデバイスで実行されているゲーム タイトル、一部のクライアントまたはサーバー ベースのビジネス ソリューション、または Web サイトの一部として、テレメトリ データ、診断情報、使用状況ログ、またはその他のログ データのソースです。

  • EventHubConsumerClient は、そのような情報を Event Hub から取得して処理します。 処理には、集計、複雑な計算、フィルター処理が含まれる場合があります。 生データまたは変換された形式で情報を配布または保存する処理が含まれる場合もあります。 Event Hub コンシューマーは、多くの場合、Azure Stream Analytics、Apache Spark、Apache Storm などの組み込みの分析機能を備えた、堅牢で大規模なプラットフォーム インフラストラクチャ パーツです。

  • パーティションは、Event Hub に保持される順序付けされた一連のイベントです。 Azure Event Hubs では、パーティション分割されたコンシューマー パターンを介してメッセージ ストリーミングを提供し、各コンシューマーがメッセージ ストリームの特定のサブセット (パーティション) のみを読み取ります。 新しいイベントが到着すると、このシーケンスの末尾に追加されます。 パーティションの数は、Event Hub の作成時に指定され、変更することはできません。

  • コンシューマー グループはは、Event Hub 全体のビューです。 コンシューマー グループを使用すると、複数のコンシューマー アプリケーションが個別のイベント ストリーム ビューを持つことができるようになり、それぞれの場所から独自のペースでストリームを個別に読み取ることができます。 コンシューマー グループあたり最大 5 つのリーダーをパーティションに同時に設定できますが、特定のパーティションとコンシューマー グループの組み合わせには、アクティブな 1 つのコンシューマーのみをお勧めします。 各アクティブ リーダーでは、そのパーティションからすべてのイベントを受け取ります。同じパーティションに複数のリーダーがある場合は、重複するイベントを受信します。

その他の概念と詳細な説明については、「 Event Hubs の機能」を参照してください。 また、AMQP の概念については、 OASIS Advanced Messaging Queuing Protocol (AMQP) バージョン 1.0 に詳しく記載されています。

スレッド セーフ

EventHubProducerClient または EventHubConsumerClient がスレッド セーフであることを保証するものではありません。 スレッド間でこれらのインスタンスを再利用することはお勧めしません。 これらのクラスをスレッド セーフな方法で使用するのは、実行中のアプリケーション次第です。

データ モデル型はスレッド EventDataBatch セーフではありません。 スレッド間で共有したり、クライアント メソッドと同時に使用したりしないでください。

次のセクションでは、次のような最も一般的な Event Hubs タスクをカバーするいくつかのコード スニペットを示します。

Event Hub を検査する

イベント ハブのパーティション ID を取得します。

import os
from azure.eventhub import EventHubConsumerClient

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"]
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

consumer_client = EventHubConsumerClient.from_connection_string(
    conn_str=CONNECTION_STR,
    consumer_group='$Default',
    eventhub_name=EVENTHUB_NAME,
)

with consumer_client:
    pass # consumer_client is now ready to be used.

イベントを Event Hub に発行する

メソッドEventHubProducerClientcreate_batch使用してオブジェクトをEventDataBatch作成し、 メソッドをsend_batch使用して送信できます。 バッチ サイズの EventDataBatch 上限 (バイト単位) に達するまで、 メソッドを使用して add イベントを に追加できます。

def send_event_data_batch(producer):
    # Without specifying partition_id or partition_key
    # the events will be distributed to available partitions via round-robin.
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData('Single message'))
    producer.send_batch(event_data_batch)

イベント ハブからのイベントを使用する

EventHub からイベントを使用するには、複数の方法があります。 イベントの受信時にコールバックをトリガーするだけの場合、 EventHubConsumerClient.receive メソッドは次のように使用されます。

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint(event)

with client:
    client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive(on_event=on_event, partition_id='0')

イベント ハブからのイベントをバッチで使用する

上記のサンプルでは、受信時に各メッセージのコールバックをトリガーしますが、次のサンプルでは、イベントのバッチでコールバックをトリガーし、一度に数値を受信しようとします。

import logging
from azure.eventhub import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    partition_context.update_checkpoint()

with client:
    client.receive_batch(
        on_event_batch=on_event_batch,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )
    # receive events from specified partition:
    # client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

イベント をイベント ハブに非同期的に発行する

メソッドEventHubProducercreate_batch使用してオブジェクトをEventDataBatch作成し、 メソッドをsend_batch使用して送信できます。 バッチ サイズの EventDataBatch 上限 (バイト単位) に達するまで、 メソッドを使用して add イベントを に追加できます。

import asyncio
from azure.eventhub.aio import EventHubProducerClient  # The package name suffixed with ".aio" for async
from azure.eventhub import EventData

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

async def create_batch(client):
    event_data_batch = await client.create_batch()
    can_add = True
    while can_add:
        try:
            event_data_batch.add(EventData('Message inside EventBatchData'))
        except ValueError:
            can_add = False  # EventDataBatch object reaches max_size.
    return event_data_batch

async def send():
    client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
    batch_data = await create_batch(client)
    async with client:
        await client.send_batch(batch_data)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(send())

イベント ハブからのイベントを非同期的に使用する

この SDK では、同期ベースのコードと asyncio ベースのコードの両方がサポートされています。 上記のサンプルで示されているようにを受け取るが、aio 内では、次のものが必要です。

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event(partition_context, event):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint(event)

async def receive():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive(
            on_event=on_event,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive(on_event=on_event, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive())

イベント ハブからのイベントを非同期的にバッチ処理で使用する

すべての同期関数は、aio でもサポートされています。 上記の同期バッチ受信で示したように、asyncio 内で次のように同じ処理を実行できます。

import logging
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

async def on_event_batch(partition_context, events):
    logger.info("Received event from partition {}".format(partition_context.partition_id))
    await partition_context.update_checkpoint()

async def receive_batch():
    client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
    async with client:
        await client.receive_batch(
            on_event_batch=on_event_batch,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
        # receive events from specified partition:
        # await client.receive_batch(on_event_batch=on_event_batch, partition_id='0')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive_batch())

チェックポイント ストアを使用してイベントを使用し、チェックポイントを保存する

EventHubConsumerClient は、複数のパーティションから一度にイベントを受信し、同じイベント ハブとコンシューマー グループを使用して他のコンシューマーとの負荷分散を可能にする高レベルのコンストラクトです。

これにより、ユーザーはチェックポイントを使用してイベントが処理されたときの進行状況を追跡することもできます。

チェックポイントは、Event Hub インスタンス内のコンシューマー グループの特定のパーティションからユーザーが最後に正常に処理したイベントを表します。 では EventHubConsumerClient 、 の CheckpointStore インスタンスを使用してチェックポイントを更新し、負荷分散アルゴリズムに必要な関連情報を格納します。

pypi にプレフィックス azure-eventhub-checkpointstore を付けて検索し、これをサポートするパッケージを検索し、そのようなパッケージの実装を CheckpointStore 使用します。 同期ライブラリと非同期ライブラリの両方が用意されていることに注意してください。

次の例では、 の EventHubConsumerClient インスタンスを作成し、 を使用します BlobCheckpointStore。 コードを実行するには 、Azure Storage アカウントBLOB コンテナー を作成する必要があります。

Azure Blob Storage チェックポイント ストアの非同期およびAzure Blob Storage チェックポイント ストア同期は、永続的なストアとしてAzure Blob Storage適用される実装の 1 つですCheckpointStore

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING FOR THE STORAGE >>'
container_name = '<<NAME OF THE BLOB CONTAINER>>'

async def on_event(partition_context, event):
    # do something
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def receive(client):
    await client.receive(
        on_event=on_event,
        starting_position="-1",  # "-1" is from the beginning of the partition.
    )

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, container_name)
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,  # For load balancing and checkpoint. Leave None for no load balancing
    )
    async with client:
        await receive(client)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

EventHubConsumerClient を使用してIoT Hubを操作する

を使用EventHubConsumerClientして、IoT Hubを操作することもできます。 これは、リンクされた EventHub からIoT Hubのテレメトリ データを受信する場合に便利です。 関連付けられている接続文字列には送信要求がないため、イベントを送信することはできません。

接続文字列は、Event Hub と互換性のあるエンドポイント用である必要があることに注意してください。たとえば、"Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name"

Event Hubs 互換エンドポイントを取得するには、次の 2 つの方法があります。

  • Azure Portal でIoT Hubの "組み込みエンドポイント" を手動で取得し、そこから受信します。
from azure.eventhub import EventHubConsumerClient

connection_str = 'Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name'
consumer_group = '<< CONSUMER GROUP >>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group)

partition_ids = client.get_partition_ids()

トラブルシューティング

さまざまな障害シナリオをazure-eventhubs診断する方法の詳細については、トラブルシューティング ガイドを参照してください。

次のステップ

その他のサンプル コード

このライブラリを使用して Event Hubs との間でイベントを送受信する方法の詳細な例については、 サンプル ディレクトリを参照してください。

ドキュメント

リファレンス ドキュメントについては、 こちらを参照してください

スキーマ レジストリと Avro エンコーダー

EventHubs SDK は、 スキーマ レジストリ サービスと Avro とうまく統合されています。 詳細については、「 スキーマ レジストリ SDK 」と 「スキーマ レジストリ Avro Encoder SDK」を参照してください。

純粋な Python AMQP トランスポートと下位互換性のサポート

Azure Event Hubs クライアント ライブラリは、純粋な Python AMQP 実装に基づいています。 uAMQP が必要な依存関係として削除されました。

基になるトランスポートとして を使用 uAMQP するには:

  1. pip を使用してインストール uamqp します。
$ pip install uamqp 
  1. クライアントの構築中にを渡します uamqp_transport=True
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'

client = EventHubProducerClient.from_connection_string(
    connection_str, eventhub_name=eventhub_name, uamqp_transport=True
)
client = EventHubConsumerClient.from_connection_string(
    connection_str, consumer_group, eventhub_name=eventhub_name, uamqp_transport=True
)

注: message 以前に を公開していた uamqp.MessageEventData/EventDataBatch属性は非推奨になりました。 によって EventData.message/EventDataBatch.message 返される "レガシ" オブジェクトは、移行を容易にするために導入されています。

ソースからの uAMQP ホイールのビルド

uAMQP が の基になる AMQP プロトコル実装azure-eventhubとして使用されることを意図している場合は、ほとんどの主要なオペレーティング システムで uAMQP ホイールが見つかります。

を使用 uAMQP する予定で、uAMQP ホイールが提供されていないプラットフォームで実行している場合は、 uAMQP インストール ガイダンスに従ってソースからインストールしてください。

フィードバックの提供

バグが発生した場合、または提案がある場合は、プロジェクトの [問題 ] セクションに問題を提出してください。

共同作成

このプロジェクトでは、共同作成と提案を歓迎しています。 ほとんどの共同作成では、共同作成者使用許諾契約書 (CLA) にご同意いただき、ご自身の共同作成内容を使用する権利を Microsoft に供与する権利をお持ちであり、かつ実際に供与することを宣言していただく必要があります。 詳細については、 https://cla.microsoft.com を参照してください。

pull request を送信すると、CLA を提供して PR (ラベル、コメントなど) を適宜装飾する必要があるかどうかを CLA ボットが自動的に決定します。 ボットによって提供される手順にそのまま従ってください。 この操作は、Microsoft の CLA を使用するすべてのリポジトリについて、1 回だけ行う必要があります。

このプロジェクトでは、Microsoft オープン ソースの倫理規定を採用しています。 詳しくは、「Code of Conduct FAQ (倫理規定についてよくある質問)」を参照するか、opencode@microsoft.com 宛てに質問またはコメントをお送りください。

インプレッション数