次の方法で共有


EventHubConsumerClient クラス

EventHubConsumerClient クラスは、Azure Event Hubs サービスからイベントを受信するための高レベルのインターフェイスを定義します。

EventHubConsumerClient のメイン目標は、負荷分散とチェックポイント処理を使用して EventHub のすべてのパーティションからイベントを受信することです。

同じイベント ハブ、コンシューマー グループ、チェックポイントの場所に対して複数の EventHubConsumerClient インスタンスが実行されている場合、パーティションはそれらの間で均等に分散されます。

負荷分散と永続化されたチェックポイントを有効にするには、 EventHubConsumerClient の作成時にcheckpoint_storeを設定する必要があります。 チェックポイント ストアが指定されていない場合、チェックポイントはメモリ内で内部的に維持されます。

EventHubConsumerClient は、メソッド receive() または receive_batch() を呼び出してpartition_idを指定するときに、特定のパーティションから受信することもできます。 負荷分散は、単一パーティション モードでは機能しません。 ただし、checkpoint_storeが設定されている場合でも、ユーザーはチェックポイントを保存できます。

継承
azure.eventhub._client_base.ClientBase
EventHubConsumerClient

コンストラクター

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

パラメーター

fully_qualified_namespace
str
必須

Event Hubs 名前空間の完全修飾ホスト名。 名前空間の形式は .servicebus.windows.net です。

eventhub_name
str
必須

クライアントを接続する特定のイベント ハブのパス。

consumer_group
str
必須

このコンシューマー グループのイベント ハブからイベントを受信します。

credential
TokenCredential または AzureSasCredential または AzureNamedKeyCredential
必須

トークンを取得するための特定のインターフェイスを実装する認証に使用される資格情報オブジェクト。 azure-identity ライブラリによって生成された 、または資格情報オブジェクトと、*get_token(self, scopes) メソッドを実装するオブジェクトを受け入れますEventHubSharedKeyCredential

logging_enable
bool

ネットワーク トレース ログをロガーに出力するかどうか。 既定値は False です。

auth_timeout
float

サービスによってトークンが承認されるまで待機する時間 (秒単位)。 既定値は 60 秒です。 0 に設定すると、クライアントからタイムアウトは適用されません。

user_agent
str

指定した場合、これはユーザー エージェント文字列の前に追加されます。

retry_total
int

エラーが発生したときに失敗した操作をやり直す試行の合計数。 既定値は 3 です。 受信 におけるretry_total のコンテキストは特別です。receive メソッドは、各イテレーションで内部 受信 メソッドを呼び出す while ループによって実装されます。 受信ケースでは、retry_totalは while ループ内の内部受信メソッドによって発生したエラー後の再試行回数を指定します。 再試行が使い果たされた場合は、エラー情報と共に on_error コールバックが呼び出されます (指定されている場合)。 失敗した内部パーティション コンシューマーは閉じられ (on_partition_close が指定されている場合は呼び出されます)、新しい内部パーティション コンシューマーが作成され (on_partition_initialize 指定 されている場合は呼び出されます)、受信を再開します。

retry_backoff_factor
float

2 回目の試行後の試行間に適用するバックオフ係数 (ほとんどのエラーは、遅延なしで 2 回目の試行によってすぐに解決されます)。 固定モードでは、再試行ポリシーは常に {backoff factor} のスリープ状態になります。 'exponential' モードでは、再試行ポリシーは {backoff factor} * (2 ** ({合計再試行回数} - 1)) 秒でスリープ状態になります。 backoff_factorが 0.1 の場合、再試行の間に [0.0s,0.2s, 0.4s, ...] がスリープ状態になります。 既定値は 0.8 です。

retry_backoff_max
float

最大バックオフ時間。 既定値は 120 秒 (2 分) です。

retry_mode
str

再試行の間の遅延動作。 サポートされている値は 'fixed' または 'exponential' です。既定値は 'exponential' です。

idle_timeout
float

タイムアウト (秒単位)。 それ以降のアクティビティがない場合、このクライアントは基になる接続を閉じます。 既定では、値は None です。これは、サービスによって開始されない限り、非アクティブのためクライアントがシャットダウンしないことを意味します。

transport_type
TransportType

Event Hubs サービスとの通信に使用されるトランスポート プロトコルの種類。 既定値は TransportType.Amqp です。この場合、ポート 5671 が使用されます。 ネットワーク環境でポート 5671 が使用できないかブロックされている場合は、 代わりに TransportType.AmqpOverWebsocket を使用して、通信にポート 443 を使用できます。

http_proxy
dict[str, str または int]

HTTP プロキシ設定。 これは、 'proxy_hostname' (str 値) と ' proxy_port' (int 値) のキーを持つディクショナリである必要があります。 さらに、次のキーが存在する場合もあります: 'username'、'password'

checkpoint_store
CheckpointStore または None

イベントの受信時にパーティション負荷分散とチェックポイント データを格納するマネージャー。 チェックポイント ストアは、すべてのパーティションまたは 1 つのパーティションから受信する場合の両方で使用されます。 後者の場合、負荷分散は適用されません。 チェックポイント ストアが指定されていない場合、チェックポイントはメモリ内に内部的に保持され、 EventHubConsumerClient インスタンスは負荷分散なしでイベントを受信します。

load_balancing_interval
float

負荷分散が開始されたとき。 これは、2 つの負荷分散評価の間隔 (秒単位) です。 既定値は 30 秒です。

partition_ownership_expiration_interval
float

パーティションの所有権は、この秒数後に期限切れになります。 すべての負荷分散評価では、所有権の有効期限が自動的に延長されます。 既定値は 6 * load_balancing_interval、つまり 30 秒の既定のload_balancing_intervalを使用する場合は 180 秒です。

load_balancing_strategy
str または LoadBalancingStrategy

負荷分散が開始されると、この戦略を使用してパーティションの所有権を要求し、バランスを取ります。 "greedy" または LoadBalancingStrategy.GREEDY を使用して、負荷分散の評価ごとに、負荷のバランスを取るために必要な解放されていないパーティションを取得します。 "balanced" または LoadBalancingStrategy.BALANCED は、負荷分散の評価ごとに、他の EventHubConsumerClient によって要求されない 1 つのパーティションのみを要求するバランス戦略に使用します。 EventHub のすべてのパーティションが他の EventHubConsumerClient によって要求され、このクライアントが要求するパーティションが少なすぎる場合、このクライアントは負荷分散戦略に関係なく、負荷分散評価ごとに他のクライアントから 1 つのパーティションを盗みます。 Greedy 戦略は既定で使用されます。

custom_endpoint_address
str または None

Event Hubs サービスへの接続を確立するために使用するカスタム エンドポイント アドレス。これにより、ホスト環境に必要な任意のアプリケーション ゲートウェイまたはその他のパスを介してネットワーク要求をルーティングできます。 既定値はなしです。 形式は "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" のようになります。 custom_endpoint_addressで port が指定されていない場合、既定ではポート 443 が使用されます。

connection_verify
str または None

接続エンドポイントの ID を認証するために使用される SSL 証明書のカスタム CA_BUNDLE ファイルへのパス。 既定値は None で、その場合 は certifi.where() が使用されます。

uamqp_transport
bool

基になるトランスポートとして uamqp ライブラリを使用するかどうか。 既定値は False で、純粋な Python AMQP ライブラリが基になるトランスポートとして使用されます。

socket_timeout
float

接続の基になるソケットが、データの送受信時にタイムアウトするまで待機する時間 (秒単位)。既定値は、TransportType.Amqp の場合は 0.2、TransportType.AmqpOverWebsocket の場合は 1 です。 書き込みタイムアウトが原因で EventHubsConnectionError エラーが発生している場合は、既定値よりも大きい値を渡す必要がある場合があります。 これは高度な使用シナリオの場合であり、通常は既定値で十分です。

EventHubConsumerClient の新しいインスタンスを作成します。


   import os
   from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
   consumer = EventHubConsumerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       eventhub_name=eventhub_name,
       consumer_group='$Default',
       credential=credential)

メソッド

close

イベント ハブからのイベントの取得を停止し、基になる AMQP 接続とリンクを閉じます。

from_connection_string

接続文字列から EventHubConsumerClient を作成します。

get_eventhub_properties

イベント ハブのプロパティを取得します。

返されるディクショナリのキーは次のとおりです。

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

イベント ハブのパーティション ID を取得します。

get_partition_properties

指定したパーティションのプロパティを取得します。

プロパティ ディクショナリのキーは次のとおりです。

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

オプションの負荷分散とチェックポイント処理を使用して、パーティションからイベントを受信します。

receive_batch

オプションの負荷分散とチェックポイント処理を使用して、パーティションからイベントを受信します。

close

イベント ハブからのイベントの取得を停止し、基になる AMQP 接続とリンクを閉じます。

close() -> None

の戻り値の型 :

クライアントを閉じます。


   import os
   import threading

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group="$Default",
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, multi-thread will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The 'receive' method is a blocking call, it can be executed in a thread for
   # non-blocking behavior, and combined with the 'close' method.

   worker = threading.Thread(
       target=consumer.receive,
       kwargs={
           "on_event": on_event,
           "starting_position": "-1",  # "-1" is from the beginning of the partition.
       }
   )
   worker.start()
   time.sleep(10)  # Keep receiving for 10s then close.
   # Close down the consumer handler explicitly.
   consumer.close()

from_connection_string

接続文字列から EventHubConsumerClient を作成します。

from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient

パラメーター

conn_str
str
必須

イベント ハブの接続文字列。

consumer_group
str
必須

このコンシューマー グループのイベント ハブからイベントを受信します。

eventhub_name
str

クライアントを接続する特定のイベント ハブのパス。

logging_enable
bool

ネットワーク トレース ログをロガーに出力するかどうか。 既定値は False です。

auth_timeout
float

サービスによってトークンが承認されるまで待機する時間 (秒単位)。 既定値は 60 秒です。 0 に設定すると、クライアントからタイムアウトは適用されません。

user_agent
str

指定した場合、これはユーザー エージェント文字列の前に追加されます。

retry_total
int

エラーが発生したときに失敗した操作をやり直す試行の合計数。 既定値は 3 です。 受信 におけるretry_total のコンテキストは特別です。receive メソッドは、各イテレーションで内部 受信 メソッドを呼び出す while ループによって実装されます。 受信ケースでは、retry_totalは while ループ内の内部受信メソッドによって発生したエラー後の再試行回数を指定します。 再試行が使い果たされた場合は、エラー情報と共に on_error コールバックが呼び出されます (指定されている場合)。 失敗した内部パーティション コンシューマーは閉じられ (on_partition_close が指定されている場合は呼び出されます)、新しい内部パーティション コンシューマーが作成され (on_partition_initialize 指定 されている場合は呼び出されます)、受信を再開します。

retry_backoff_factor
float

2 回目の試行後の試行間に適用するバックオフ係数 (ほとんどのエラーは、遅延なしで 2 回目の試行によってすぐに解決されます)。 固定モードでは、再試行ポリシーは常に {backoff factor} のスリープ状態になります。 'exponential' モードでは、再試行ポリシーは {backoff factor} * (2 ** ({合計再試行回数} - 1)) 秒でスリープ状態になります。 backoff_factorが 0.1 の場合、再試行の間に [0.0s, 0.2s, 0.4s, ...] の再試行がスリープ状態になります。 既定値は 0.8 です。

retry_backoff_max
float

最大バックオフ時間。 既定値は 120 秒 (2 分) です。

retry_mode
str

再試行の間の遅延動作。 サポートされている値は 'fixed' または 'exponential' で、既定値は 'exponential' です。

idle_timeout
float

タイムアウト (秒単位)。その後、このクライアントは、ファーサー アクティビティがない場合に基になる接続を閉じます。 既定では、値は None です。つまり、サービスによって開始されない限り、非アクティブのためクライアントはシャットダウンされません。

transport_type
TransportType

Event Hubs サービスとの通信に使用されるトランスポート プロトコルの種類。 既定値は TransportType.Amqp です。この場合、ポート 5671 が使用されます。 ネットワーク環境でポート 5671 が使用できないかブロックされている場合は、 代わりに TransportType.AmqpOverWebsocket を使用して、通信にポート 443 を使用できます。

http_proxy
dict

HTTP プロキシ設定。 これは、次のキーを持つディクショナリである必要があります: 'proxy_hostname' (str 値) と 'proxy_port' (int 値)。 さらに、次のキーが存在する場合もあります: 'username'、'password'

checkpoint_store
CheckpointStore または None

イベントの受信時にパーティション負荷分散とチェックポイント データを格納するマネージャー。 チェックポイント ストアは、すべてのパーティションまたは 1 つのパーティションから受信する場合の両方で使用されます。 後者の場合、負荷分散は適用されません。 チェックポイント ストアが指定されていない場合、チェックポイントはメモリ内で内部的に維持され、 EventHubConsumerClient インスタンスは負荷分散なしでイベントを受信します。

load_balancing_interval
float

負荷分散が開始されたとき。 これは、2 つの負荷分散評価の間隔 (秒単位) です。 既定値は 10 秒です。

partition_ownership_expiration_interval
float

パーティションの所有権は、この秒数後に期限切れになります。 すべての負荷分散評価では、所有権の有効期限が自動的に延長されます。 既定値は 6 * load_balancing_interval、つまり既定のload_balancing_interval 30 秒を使用する場合は 60 秒です。

load_balancing_strategy
str または LoadBalancingStrategy

負荷分散が開始されると、この戦略を使用してパーティションの所有権を要求し、バランスを取ります。 "greedy" または LoadBalancingStrategy.GREEDY を greedy 戦略に使用します。これは、負荷分散評価ごとに、負荷のバランスを取るために必要な数の未請求パーティションを取得します。 "balanced" または LoadBalancingStrategy.BALANCED を使用して、負荷分散評価ごとに、他の EventHubConsumerClient によって要求されない 1 つのパーティションのみを要求するバランス戦略を行います。 EventHub のすべてのパーティションが他の EventHubConsumerClient によって要求され、このクライアントが要求するパーティションが少なすぎる場合、このクライアントは負荷分散戦略に関係なく、負荷分散評価ごとに他のクライアントから 1 つのパーティションを盗みます。 Greedy 戦略は既定で使用されます。

custom_endpoint_address
str または None

Event Hubs サービスへの接続を確立するために使用するカスタム エンドポイント アドレス。これにより、ホスト環境に必要なアプリケーション ゲートウェイまたはその他のパスを介してネットワーク要求をルーティングできます。 既定値はなしです。 形式は "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" のようになります。 custom_endpoint_addressで port が指定されていない場合、既定ではポート 443 が使用されます。

connection_verify
str または None

接続エンドポイントの ID を認証するために使用される SSL 証明書のカスタム CA_BUNDLE ファイルへのパス。 既定値は None で、その場合 は certifi.where() が使用されます。

uamqp_transport
bool

基になるトランスポートとして uamqp ライブラリを使用するかどうか。 既定値は False で、Pure Python AMQP ライブラリが基になるトランスポートとして使用されます。

の戻り値の型 :

接続文字列から EventHubConsumerClient の新しいインスタンスを作成します。


   import os
   from azure.eventhub import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

イベント ハブのプロパティを取得します。

返されるディクショナリのキーは次のとおりです。

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_eventhub_properties() -> Dict[str, Any]

戻り値

イベント ハブに関する情報を含むディクショナリ。

の戻り値の型 :

例外

get_partition_ids

イベント ハブのパーティション ID を取得します。

get_partition_ids() -> List[str]

戻り値

パーティション ID の一覧。

の戻り値の型 :

例外

get_partition_properties

指定したパーティションのプロパティを取得します。

プロパティ ディクショナリのキーは次のとおりです。

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

get_partition_properties(partition_id: str) -> Dict[str, Any]

パラメーター

partition_id
str
必須

ターゲット パーティション ID。

戻り値

パーティション プロパティを含むディクショナリ。

の戻り値の型 :

例外

receive

オプションの負荷分散とチェックポイント処理を使用して、パーティションからイベントを受信します。

receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None

パラメーター

on_event
callable[PartitionContext, EventData または None]
必須

受信したイベントを処理するためのコールバック関数。 コールバックは、パーティション コンテキストを含む partition_context と、受信したイベントである イベント の 2 つのパラメーターを受け取ります。 コールバック関数は、次のように定義する必要があります: on_event(partition_context, event)。 パーティション コンテキストの詳細については、 を PartitionContext参照してください。

max_wait_time
float

イベント プロセッサがコールバックを呼び出す前に待機する最大間隔 (秒単位)。 この期間内にイベントが受信されない場合、 on_event コールバックは None で呼び出されます。 この値が None または 0 (既定値) に設定されている場合、イベントを受信するまでコールバックは呼び出されません。

partition_id
str

指定した場合、クライアントはこのパーティションからのみ受信します。 それ以外の場合、クライアントはすべてのパーティションから受信します。

owner_level
int

排他的コンシューマーの優先順位。 owner_levelが設定されている場合は、排他的コンシューマーが作成されます。 より高いowner_levelを持つコンシューマーは、より高い排他的優先順位を持ちます。 所有者レベルは、コンシューマーの "エポック値" とも認識されます。

prefetch
int

処理のためにサービスからプリフェッチするイベントの数。 既定値は 300 です。

track_last_enqueued_event_properties
bool

コンシューマーが関連付けられたパーティションで最後にエンキューされたイベントに関する情報を要求し、イベントの受信時にその情報を追跡するかどうかを示します。 パーティションの last-enqueued イベントに関する情報が追跡されている場合、Event Hubs サービスから受信した各イベントは、パーティションに関するメタデータを保持します。 これにより、Event Hub クライアントを使用してパーティション プロパティの要求を定期的に行うことを考慮すると、通常は有利なトレードオフとなる、追加のネットワーク帯域幅消費が少なくなります。 既定では False に設定されています。

starting_position
str, int, datetime または dict[str,any]

パーティションのチェックポイント データがない場合は、このイベント位置からの受信を開始します。 チェックポイント データが使用可能な場合は使用されます。 これは、キーとしてパーティション ID を持ち、個々のパーティションの値として位置を持つディクテーション、またはすべてのパーティションに対して 1 つの値にすることができます。 値の型は、str、int、または datetime.datetime です。 また、ストリームの先頭から受信するための値 "-1" と、新しいイベントのみを受信するための "@latest" もサポートされています。 既定値は "@latest" です。

starting_position_inclusive
bool または dict[str,bool]

指定したstarting_positionが包括的 (=) であるかどうかを判断します (>>)。 包括の場合は True、排他的の場合は False。 これは、キーとしてパーティション ID を持つディクテーション、および特定のパーティションのstarting_positionが包括的かどうかを示す値として bool にすることができます。 これは、すべてのstarting_positionの 1 つのブール値にすることもできます。 既定値は False です。

on_error
callable[[PartitionContext, Exception]]

再試行が使い果たされた後、または負荷分散の処理中に受信中にエラーが発生したときに呼び出されるコールバック関数。 コールバックは、パーティション情報を含む partition_context と例外である エラー の 2 つのパラメーターを受け取ります。 負荷分散 の処理中にエラーが発生した場合、partition_contextは None になる可能性があります。 コールバックは次のように定義する必要があります: on_error(partition_context, error). on_event コールバック中に未処理の例外が発生した場合は、on_error コールバックも呼び出されます。

on_partition_initialize
callable[[PartitionContext]]

特定のパーティションのコンシューマーが初期化を完了した後に呼び出されるコールバック関数。 また、障害が発生し、閉じられた内部パーティション コンシューマーの受信プロセスを引き継ぐ新しい内部パーティション コンシューマーが作成されるときにも呼び出されます。 コールバックは、パーティション情報を含む partition_context という 1 つのパラメーターを受け取ります。 コールバックは次のように定義する必要があります: on_partition_initialize(partition_context)

on_partition_close
callable[[PartitionContext, CloseReason]]

特定のパーティションのコンシューマーが閉じられた後に呼び出されるコールバック関数。 再試行が使い果たされた後の受信中にエラーが発生した場合にも呼び出されます。 コールバックは、パーティション情報とクローズの理由を含む partition_context という 2 つのパラメーターを受け取ります。 コールバックは次のように定義する必要があります: on_partition_close(partition_context, reason). 閉会の理由はこちらをご覧 CloseReason ください。

の戻り値の型 :

EventHub からイベントを受信します。


       logger = logging.getLogger("azure.eventhub")

       def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive(on_event=on_event)

receive_batch

オプションの負荷分散とチェックポイント処理を使用して、パーティションからイベントを受信します。

receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None

パラメーター

on_event_batch
callable[PartitionContext, list[EventData]]
必須

受信したイベントのバッチを処理するためのコールバック関数。 コールバックは、パーティション コンテキストを含む partition_context と、受信したイベント である event_batch という 2 つのパラメーターを受け取ります。 コールバック関数は、 on_event_batch(partition_context、event_batch) のように定義する必要があります。 max_wait_time が None でも 0 でもなく、 max_wait_time 後にイベントが受信されない場合、 event_batchは空のリストになる可能性があります。 パーティション コンテキストの詳細については、 を PartitionContext参照してください。

max_batch_size
int

コールバック on_event_batchに渡されるバッチ内のイベントの最大数。 実際に受信したイベント数が max_batch_sizeを超える場合、受信したイベントはバッチに分割され、最大 max_batch_size イベントを含む各バッチのコールバックが呼び出されます。

max_wait_time
float

イベント プロセッサがコールバックを呼び出す前に待機する最大間隔 (秒単位)。 この期間内にイベントが受信されない場合は、空のリストを 使用してon_event_batch コールバックが呼び出されます。

partition_id
str

指定した場合、クライアントはこのパーティションからのみ受信します。 それ以外の場合、クライアントはすべてのパーティションから受信します。

owner_level
int

排他的コンシューマーの優先順位。 owner_levelが設定されている場合は、排他的コンシューマーが作成されます。 より高いowner_levelを持つコンシューマーは、より高い排他的優先度を持ちます。 所有者レベルは、コンシューマーの "エポック値" とも認識されます。

prefetch
int

処理のためにサービスからプリフェッチするイベントの数。 既定値は 300 です。

track_last_enqueued_event_properties
bool

コンシューマーが、関連付けられたパーティションで最後にエンキューされたイベントに関する情報を要求し、イベントの受信時にその情報を追跡する必要があるかどうかを示します。 パーティションの最後にエンキューされたイベントに関する情報が追跡されている場合、Event Hubs サービスから受信した各イベントは、パーティションに関するメタデータを保持します。 これにより、イベント ハブ クライアントを使用してパーティション プロパティの要求を定期的に行う場合と見なされる場合、通常は有利なトレードオフとなるネットワーク帯域幅の使用量が少なくなります。 既定では False に設定されています。

starting_position
str, int, datetime または dict[str,any]

パーティションのチェックポイント データがない場合は、このイベント位置からの受信を開始します。 チェックポイント データは、使用可能な場合に使用されます。 これは、キーとしてパーティション ID を持ち、個々のパーティションの値としての位置を持つディクテーション、またはすべてのパーティションに対して 1 つの値にすることができます。 値の型には、str、int、datetime.datetime を指定できます。 ストリームの先頭から受信するための値 "-1" と、新しいイベントのみを受信するための "@latest" もサポートされています。 既定値は "@latest" です。

starting_position_inclusive
bool または dict[str,bool]

指定したstarting_positionが inclusive(=) であるかどうかを判断します (>>)。 インクルーシブの場合は True、排他の場合は False。 これは、パーティション ID をキーとして、値として bool を指定して、特定のパーティションのstarting_positionが包括的かどうかを示す dict にすることができます。 これは、すべてのstarting_positionに対して 1 つのブール値にすることもできます。 既定値は False です。

on_error
callable[[PartitionContext, Exception]]

再試行が使い果たされた後、または負荷分散の処理中に、受信中にエラーが発生したときに呼び出されるコールバック関数。 コールバックは、パーティション情報を含む partition_context と例外である エラー という 2 つのパラメーターを受け取ります。 負荷分散の 処理中にエラーが発生した場合は、partition_context None になる可能性があります。 コールバックは次のように定義する必要があります: on_error(partition_context, error). on_errorコールバックは、on_eventコールバック中にハンドルされない例外が発生した場合にも呼び出されます。

on_partition_initialize
callable[[PartitionContext]]

特定のパーティションのコンシューマーが初期化を完了した後に呼び出されるコールバック関数。 また、失敗した閉じられた内部パーティション コンシューマーの受信プロセスを引き継ぐ新しい内部パーティション コンシューマーが作成されるときにも呼び出されます。 コールバックは、パーティション情報を含む partition_context という 1 つのパラメーターを受け取ります。 コールバックは、 on_partition_initialize(partition_context)のように定義する必要があります。

on_partition_close
callable[[PartitionContext, CloseReason]]

特定のパーティションのコンシューマーが閉じられた後に呼び出されるコールバック関数。 再試行が使い果たされた後の受信中にエラーが発生した場合にも呼び出されます。 コールバックは、パーティション情報とクローズの理由を含む partition_context という 2 つのパラメーターを受け取ります。 コールバックは次のように定義する必要があります: on_partition_close(partition_context, reason). 閉会の理由はこちらをご覧 CloseReason ください。

の戻り値の型 :

EventHub からバッチでイベントを受信します。


       logger = logging.getLogger("azure.eventhub")

       def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received events from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive_batch(on_event_batch=on_event_batch)