EventHubConsumerClient 클래스
EventHubConsumerClient 클래스는 Azure Event Hubs 서비스에서 이벤트를 수신하기 위한 상위 수준 인터페이스를 정의합니다.
EventHubConsumerClient의 기본 목표는 부하 분산 및 검사점이 있는 EventHub의 모든 파티션에서 이벤트를 수신하는 것입니다.
여러 EventHubConsumerClient 인스턴스가 동일한 이벤트 허브, 소비자 그룹 및 검사점 위치에 대해 실행되는 경우 파티션이 균등하게 분산됩니다.
부하 분산 및 지속형 검사점 사용하려면 EventHubConsumerClient를 만들 때 checkpoint_store 설정해야 합니다. 검사점 저장소가 제공되지 않으면 검사점은 메모리에서 내부적으로 유지 관리됩니다.
EventHubConsumerClient는 해당 메서드 receive() 또는 receive_batch()를 호출하고 partition_id 지정할 때 특정 파티션에서 수신할 수도 있습니다. 부하 분산은 단일 파티션 모드에서 작동하지 않습니다. 그러나 checkpoint_store 설정된 경우에도 사용자는 검사점도 저장할 수 있습니다.
- 상속
-
azure.eventhub.aio._client_base_async.ClientBaseAsyncEventHubConsumerClient
생성자
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
매개 변수
- fully_qualified_namespace
- str
Event Hubs 네임스페이스의 정규화된 호스트 이름입니다. 네임스페이스 형식은 .servicebus.windows.net.
- credential
- AsyncTokenCredential 또는 AzureSasCredential 또는 AzureNamedKeyCredential
토큰을 가져오기 위한 특정 인터페이스를 구현하는 인증에 사용되는 자격 증명 개체입니다. *get_token(자체, 범위) 메서드를 구현하는 개체 및 azure-identity 라이브러리에서 생성된 또는 자격 증명 개체를 허용EventHubSharedKeyCredential합니다.
- logging_enable
- bool
로거에 네트워크 추적 로그를 출력할지 여부입니다. 기본값은 False입니다.
- auth_timeout
- float
서비스에서 토큰이 승인될 때까지 대기하는 시간(초)입니다. 기본값은 60초입니다. 0으로 설정하면 클라이언트에서 시간 제한이 적용되지 않습니다.
- user_agent
- str
지정된 경우 사용자 에이전트 문자열 앞에 추가됩니다.
- retry_total
- int
오류가 발생할 때 실패한 작업을 다시 실행하려는 총 시도 횟수입니다. 기본값은 3입니다. 수신의 retry_total 컨텍스트는 특별합니다. 수신 메서드는 각 반복에서 내부 수신 메서드를 호출하는 while-loop에 의해 구현됩니다. 수신 사례에서 retry_total while 루프의 내부 수신 메서드에 의해 발생한 오류 후 재시도 횟수를 지정합니다. 재시도 시도가 모두 완료되면 오류 정보와 함께 on_error 콜백이 호출됩니다(제공된 경우). 실패한 내부 파티션 소비자는 닫힙니다(제공된 경우 on_partition_close 호출됨). 새 내부 파티션 소비자가 만들어지고(제공된 경우 on_partition_initialize 호출됨) 수신을 다시 시작합니다.
- retry_backoff_factor
- float
두 번째 시도 후 시도 간에 적용할 백오프 요소입니다(대부분의 오류는 지연 없이 두 번째 시도에 의해 즉시 해결됨). 고정 모드에서는 {backoff factor}에 대해 재시도 정책이 항상 절전 모드로 전환됩니다. '지수' 모드에서 재시도 정책은 {backoff factor} * (2 ** ({총 재시도 횟수 } - 1)) 초 동안 절전 모드로 전환됩니다. backoff_factor 0.1이면 재시도 사이에 [0.0s, 0.2s, 0.4s, ...]에 대해 재시도가 절전 모드로 전환됩니다. 기본값은 0.8입니다.
- retry_backoff_max
- float
최대 백오프 시간입니다. 기본값은 120초(2분)입니다.
- retry_mode
- str
재시도 사이의 지연 동작입니다. 지원되는 값은 '고정' 또는 '지수'이며, 여기서 기본값은 '지수'입니다.
- idle_timeout
- float
시간 제한(초)이며, 이후 이 클라이언트는 추가 작업이 없는 경우 기본 연결을 닫습니다. 기본적으로 값은 None입니다. 즉, 서비스에서 시작하지 않는 한 비활성으로 인해 클라이언트가 종료되지 않습니다.
- transport_type
- TransportType
Event Hubs 서비스와 통신하는 데 사용할 전송 프로토콜의 유형입니다. 기본값은 TransportType.Amqp 이며 이 경우 포트 5671이 사용됩니다. 네트워크 환경에서 포트 5671을 사용할 수 없거나 차단된 경우 통신을 위해 포트 443을 사용하는 TransportType.AmqpOverWebsocket 을 대신 사용할 수 있습니다.
- http_proxy
HTTP 프록시 설정. 'proxy_hostname'(str value) 및'proxy_port'(int value) 키가 있는 사전이어야 합니다.
- checkpoint_store
- Optional[CheckpointStore]
이벤트를 수신할 때 파티션 부하 분산 및 검사점 데이터를 저장하는 관리자입니다. 검사점 저장소는 모든 파티션 또는 단일 파티션에서 수신하는 두 경우 모두에 사용됩니다. 후자의 경우 부하 분산이 적용되지 않습니다. 검사점 저장소가 제공되지 않으면 검사점이 메모리에 내부적으로 유지되고 EventHubConsumerClient instance 부하 분산 없이 이벤트를 받습니다.
- load_balancing_interval
- float
부하 분산이 시작되는 경우. 두 부하 분산 평가 사이의 간격(초)입니다. 기본값은 30초입니다.
- partition_ownership_expiration_interval
- float
파티션 소유권은 이 시간(초) 후에 만료됩니다. 모든 부하 분산 평가는 소유권 만료 시간을 자동으로 연장합니다. 기본값은 6 * load_balancing_interval, 즉 기본 load_balancing_interval 사용하는 경우 180초입니다.
- load_balancing_strategy
- str 또는 LoadBalancingStrategy
부하 분산이 시작되면 이 전략을 사용하여 파티션 소유권을 클레임하고 균형을 조정합니다. greedy 전략에는 "greedy" 또는 LoadBalancingStrategy.GREEDY 를 사용합니다. 이 전략은 모든 부하 분산 평가에서 부하를 분산하는 데 필요한 클레임되지 않은 파티션을 많이 가져옵니다. 균형 잡힌 전략에는 "balanced" 또는 LoadBalancingStrategy.BALANCED 를 사용합니다. 이 전략은 모든 부하 분산 평가에서 다른 EventHubConsumerClient에서 클레임하지 않는 하나의 파티션만 클레임합니다. EventHub의 모든 파티션이 다른 EventHubConsumerClient 에서 클레임되고 이 클라이언트가 너무 적은 파티션을 주장한 경우 이 클라이언트는 부하 분산 전략에 관계없이 모든 부하 분산 평가에 대해 다른 클라이언트에서 하나의 파티션을 도용합니다. Greedy 전략은 기본적으로 사용됩니다.
Event Hubs 서비스에 대한 연결을 설정하는 데 사용할 사용자 지정 엔드포인트 주소로, 호스트 환경에 필요한 애플리케이션 게이트웨이 또는 기타 경로를 통해 네트워크 요청을 라우팅할 수 있습니다. 기본값은 None입니다. 형식은 "sb://< custom_endpoint_hostname>:<custom_endpoint_port>"와 같습니다. 포트가 custom_endpoint_address 지정되지 않은 경우 기본적으로 포트 443이 사용됩니다.
연결 엔드포인트의 ID를 인증하는 데 사용되는 SSL 인증서의 사용자 지정 CA_BUNDLE 파일 경로입니다. 기본값은 none입니다. 이 경우 certifi.where() 가 사용됩니다.
- uamqp_transport
- bool
uamqp 라이브러리를 기본 전송으로 사용할지 여부입니다. 기본값은 False이고 Pure Python AMQP 라이브러리는 기본 전송으로 사용됩니다.
- socket_timeout
- float
연결의 기본 소켓이 시간을 초과하기 전에 데이터를 보내고 받을 때 대기해야 하는 시간(초)입니다. 기본값은 TransportType.Amqp의 경우 0.2이고 TransportType.AmqpOverWebsocket의 경우 1입니다. 쓰기 시간 초과로 인해 EventHubsConnectionError 오류가 발생하는 경우 기본값보다 큰 를 전달해야 할 수 있습니다. 이는 고급 사용 시나리오용이며 일반적으로 기본값으로 충분해야 합니다.
예제
EventHubConsumerClient의 새 instance 만듭니다.
import os
from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
consumer_group='$Default',
eventhub_name=eventhub_name,
credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))
메서드
close |
이벤트 허브에서 이벤트 검색을 중지하고 기본 AMQP 연결 및 링크를 닫습니다. |
from_connection_string |
연결 문자열 EventHubConsumerClient를 만듭니다. |
get_eventhub_properties |
이벤트 허브의 속성을 가져옵니다. 반환된 사전의 키는 다음과 같습니다.
|
get_partition_ids |
이벤트 허브의 파티션 ID를 가져옵니다. |
get_partition_properties |
지정된 파티션의 속성을 가져옵니다. 속성 사전의 키는 다음과 같습니다.
|
receive |
선택적 부하 분산 및 검사점 지정을 사용하여 파티션에서 이벤트를 수신합니다. |
receive_batch |
선택적 부하 분산 및 검사점 지정을 사용하여 파티션에서 이벤트를 일괄 처리로 받습니다. |
close
이벤트 허브에서 이벤트 검색을 중지하고 기본 AMQP 연결 및 링크를 닫습니다.
async close() -> None
반환 형식
예제
클라이언트를 닫습니다.
import os
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub.aio import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The receive method is a coroutine which will be blocking when awaited.
# It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.
recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
await asyncio.sleep(3) # keep receiving for 3 seconds
recv_task.cancel() # stop receiving
# Close down the consumer handler explicitly.
await consumer.close()
from_connection_string
연결 문자열 EventHubConsumerClient를 만듭니다.
from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient
매개 변수
- eventhub_name
- str
클라이언트를 연결할 특정 이벤트 허브의 경로입니다.
- logging_enable
- bool
로거에 네트워크 추적 로그를 출력할지 여부입니다. 기본값은 False입니다.
- http_proxy
- dict
HTTP 프록시 설정. 'proxy_hostname'(str value) 및'proxy_port'(int value) 키가 있는 사전이어야 합니다. 또한 'username', 'password' 키도 있을 수 있습니다.
- auth_timeout
- float
서비스에서 토큰이 승인될 때까지 대기하는 시간(초)입니다. 기본값은 60초입니다. 0으로 설정하면 클라이언트에서 시간 제한이 적용되지 않습니다.
- user_agent
- str
지정된 경우 사용자 에이전트 문자열 앞에 추가됩니다.
- retry_total
- int
오류가 발생할 때 실패한 작업을 다시 실행하려는 총 시도 횟수입니다. 기본값은 3입니다. 수신의 retry_total 컨텍스트는 특별합니다. 수신 메서드는 각 반복에서 내부 수신 메서드를 호출하는 while-loop에 의해 구현됩니다. 수신 사례에서 retry_total while 루프의 내부 수신 메서드에 의해 발생한 오류 후 재시도 횟수를 지정합니다. 재시도 시도가 모두 완료되면 오류 정보와 함께 on_error 콜백이 호출됩니다(제공된 경우). 실패한 내부 파티션 소비자는 닫힙니다(제공된 경우 on_partition_close 호출됨). 새 내부 파티션 소비자가 만들어지고(제공된 경우 on_partition_initialize 호출됨) 수신을 다시 시작합니다.
- retry_backoff_factor
- float
두 번째 시도 후 시도 간에 적용할 백오프 요소입니다(대부분의 오류는 지연 없이 두 번째 시도에 의해 즉시 해결됨). 고정 모드에서는 {backoff factor}에 대해 재시도 정책이 항상 절전 모드로 전환됩니다. '지수' 모드에서 재시도 정책은 {backoff factor} * (2 ** ({총 재시도 횟수} - 1)) 초 동안 절전 모드로 전환됩니다. backoff_factor 0.1이면 재시도 사이에 [0.0s, 0.2s, 0.4s, ...]에 대해 재시도가 절전 모드로 전환됩니다. 기본값은 0.8입니다.
- retry_backoff_max
- float
최대 백오프 시간입니다. 기본값은 120초(2분)입니다.
- retry_mode
- str
재시도 사이의 지연 동작입니다. 지원되는 값은 'fixed' 또는 'exponential'입니다. 여기서 기본값은 '지수'입니다.
- idle_timeout
- float
시간 제한(초)이며, 이후 이 클라이언트는 추가 작업이 없는 경우 기본 연결을 닫습니다. 기본적으로 값은 None입니다. 즉, 서비스에서 시작하지 않는 한 비활성으로 인해 클라이언트가 종료되지 않습니다.
- transport_type
- TransportType
Event Hubs 서비스와 통신하는 데 사용할 전송 프로토콜의 유형입니다. 기본값은 TransportType.Amqp 이며 이 경우 포트 5671이 사용됩니다. 네트워크 환경에서 포트 5671을 사용할 수 없거나 차단된 경우 통신을 위해 포트 443을 사용하는 대신 TransportType.AmqpOverWebsocket 을 사용할 수 있습니다.
- checkpoint_store
- Optional[CheckpointStore]
이벤트를 수신할 때 파티션 부하 분산 및 검사점 데이터를 저장하는 관리자입니다. 검사점 저장소는 모든 파티션 또는 단일 파티션에서 수신하는 두 경우 모두에 사용됩니다. 후자의 경우 부하 분산이 적용되지 않습니다. 검사점 저장소가 제공되지 않으면 검사점이 메모리에서 내부적으로 유지 관리되고 EventHubConsumerClient instance 부하 분산 없이 이벤트를 수신합니다.
- load_balancing_interval
- float
부하 분산이 시작되는 경우. 두 부하 분산 평가 사이의 간격(초)입니다. 기본값은 30초입니다.
- partition_ownership_expiration_interval
- float
파티션 소유권은 이 시간(초) 후에 만료됩니다. 모든 부하 분산 평가는 소유권 만료 시간을 자동으로 연장합니다. 기본값은 6 * load_balancing_interval, 즉 30초의 기본 load_balancing_interval 사용하는 경우 180초입니다.
- load_balancing_strategy
- str 또는 LoadBalancingStrategy
부하 분산이 시작되면 이 전략을 사용하여 파티션 소유권을 클레임하고 균형을 조정합니다. 탐욕 전략에는 "greedy" 또는 LoadBalancingStrategy.GREEDY 를 사용합니다. 이 전략은 모든 부하 분산 평가에 대해 부하를 분산하는 데 필요한 클레임되지 않은 파티션을 많이 가져옵니다. 균형 잡힌 전략에는 "balanced" 또는 LoadBalancingStrategy.BALANCED 를 사용합니다. 이 전략은 모든 부하 분산 평가에 대해 다른 EventHubConsumerClient에서 클레임하지 않는 하나의 파티션만 클레임합니다. EventHub의 모든 파티션이 다른 EventHubConsumerClient 에서 클레임되고 이 클라이언트가 너무 적은 파티션을 주장한 경우 이 클라이언트는 부하 분산 전략에 관계없이 모든 부하 분산 평가에 대해 다른 클라이언트에서 하나의 파티션을 도용합니다. Greedy 전략은 기본적으로 사용됩니다.
Event Hubs 서비스에 대한 연결을 설정하는 데 사용할 사용자 지정 엔드포인트 주소로, 호스트 환경에 필요한 애플리케이션 게이트웨이 또는 기타 경로를 통해 네트워크 요청을 라우팅할 수 있습니다. 기본값은 None입니다. 형식은 "sb://< custom_endpoint_hostname>:<custom_endpoint_port>"와 같습니다. 포트가 custom_endpoint_address 지정되지 않은 경우 기본적으로 포트 443이 사용됩니다.
연결 엔드포인트의 ID를 인증하는 데 사용되는 SSL 인증서의 사용자 지정 CA_BUNDLE 파일 경로입니다. 기본값은 None입니다. 이 경우 certifi.where() 가 사용됩니다.
- uamqp_transport
- bool
uamqp 라이브러리를 기본 전송으로 사용할지 여부입니다. 기본값은 False이고 Pure Python AMQP 라이브러리는 기본 전송으로 사용됩니다.
반환 형식
예제
연결 문자열 EventHubConsumerClient의 새 instance 만듭니다.
import os
from azure.eventhub.aio import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
이벤트 허브의 속성을 가져옵니다.
반환된 사전의 키는 다음과 같습니다.
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
async get_eventhub_properties() -> Dict[str, Any]
반환
이벤트 허브에 대한 정보가 포함된 사전입니다.
반환 형식
예외
get_partition_ids
이벤트 허브의 파티션 ID를 가져옵니다.
async get_partition_ids() -> List[str]
반환
파티션 ID 목록입니다.
반환 형식
예외
get_partition_properties
지정된 파티션의 속성을 가져옵니다.
속성 사전의 키는 다음과 같습니다.
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (부울)
async get_partition_properties(partition_id: str) -> Dict[str, Any]
매개 변수
반환
파티션 속성을 포함하는 사전입니다.
반환 형식
예외
receive
선택적 부하 분산 및 검사점 지정을 사용하여 파티션에서 이벤트를 수신합니다.
async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
매개 변수
- on_event
- Callable[PartitionContext, Optional[EventData]]
수신된 이벤트를 처리하기 위한 콜백 함수입니다. 콜백은 파티션 컨텍스트와 수신된 이벤트인 이벤트를 포함하는 partition_context 두 매개 변수를 사용합니다. 콜백 함수는 on_event(partition_context, 이벤트)와 같이 정의해야 합니다. 자세한 파티션 컨텍스트 정보는 를 PartitionContext참조하세요.
- max_wait_time
- float
콜백을 호출하기 전에 이벤트 프로세서가 대기하는 최대 간격(초)입니다. 이 간격 내에 이벤트가 수신되지 않으면 없음을 사용하여 on_event 콜백이 호출됩니다. 이 값을 None 또는 0(기본값)으로 설정하면 이벤트가 수신될 때까지 콜백이 호출되지 않습니다.
- partition_id
- str
지정된 경우 클라이언트는 이 파티션에서만 수신됩니다. 그렇지 않으면 클라이언트가 모든 파티션에서 수신됩니다.
- owner_level
- int
독점 소비자의 우선 순위입니다. owner_level 설정된 경우 단독 소비자가 만들어집니다. owner_level 높은 소비자는 배타적 우선 순위가 높습니다. 소유자 수준은 소비자의 'epoch 값'으로도 알고 있습니다.
- prefetch
- int
처리를 위해 서비스에서 프리페치할 이벤트 수입니다. 기본값은 300입니다.
- track_last_enqueued_event_properties
- bool
소비자가 연결된 파티션에서 마지막으로 큐에 추가된 이벤트에 대한 정보를 요청하고 이벤트가 수신될 때 해당 정보를 추적해야 하는지 여부를 나타냅니다. 마지막으로 큐에 추가된 파티션 이벤트에 대한 정보를 추적할 때 Event Hubs 서비스에서 받은 각 이벤트는 파티션에 대한 메타데이터를 전달합니다. 이로 인해 소량의 추가 네트워크 대역폭 소비가 발생하며, 이벤트 허브 클라이언트를 사용하여 파티션 속성에 대해 주기적으로 요청하는 것을 고려할 때 일반적으로 양호한 절차입니다. 기본적으로 False 로 설정됩니다.
파티션에 대한 검사점 데이터가 없는 경우 이 이벤트 위치에서 수신을 시작합니다. 검사점 데이터는 사용 가능한 경우 사용됩니다. 파티션 ID를 키로 사용하고 개별 파티션의 값으로 위치를 지정하는 받아쓰기 또는 모든 파티션에 대한 단일 값일 수 있습니다. 값 형식은 str, int 또는 datetime.datetime일 수 있습니다. 또한 스트림의 시작 부분에서 수신하기 위한 값 "-1"과 새 이벤트만 수신하기 위한 "@latest" 값도 지원됩니다.
지정된 starting_position 포함(>=)인지 여부(>)를 확인합니다. 포함의 경우 True이고, False이면 배타적입니다. 파티션 ID를 키로 사용하고 bool을 특정 파티션에 대한 starting_position 포함 여부를 나타내는 값으로 사용하는 받아쓰기일 수 있습니다. 이 값은 모든 starting_position 단일 부울 값일 수도 있습니다. 기본값은 False입니다.
- on_error
- Callable[[PartitionContext, Exception]]
재시도 시도가 소진된 후 또는 부하 분산 프로세스 중에 수신하는 동안 오류가 발생할 때 호출되는 콜백 함수입니다. 콜백은 두 개의 매개 변수를 사용합니다. partition_context 파티션 정보를 포함하고 오류가 예외입니다. 부하 분산 프로세스 중에 오류가 발생하면 partition_context 없음일 수 있습니다. 콜백은 다음과 같이 정의해야 합니다. on_error(partition_context, 오류). on_event 콜백 중에 처리되지 않은 예외가 발생하는 경우에도 on_error 콜백이 호출됩니다.
- on_partition_initialize
- Callable[[PartitionContext]]
특정 파티션에 대한 소비자가 초기화를 완료한 후 호출되는 콜백 함수입니다. 또한 실패하고 닫힌 내부 파티션 소비자에 대한 수신 프로세스를 인수하기 위해 새 내부 파티션 소비자를 만들 때 호출됩니다. 콜백은 단일 매개 변수인 partition_context 파티션 정보를 포함합니다. 콜백은 on_partition_initialize(partition_context)와 같이 정의해야 합니다.
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
특정 파티션에 대한 소비자가 닫힌 후 호출될 콜백 함수입니다. 재시도를 모두 마친 후 수신하는 동안 오류가 발생할 때도 호출됩니다. 콜백은 파티션 정보와 종료 이유를 포함하는 partition_context 두 매개 변수를 사용합니다. 콜백은 on_partition_close(partition_context, 이유)와 같이 정의해야 합니다. 닫는 다양한 이유로 을 참조 CloseReason 하세요.
반환 형식
예제
EventHub에서 이벤트를 받습니다.
logger = logging.getLogger("azure.eventhub")
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
async with consumer:
await consumer.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
receive_batch
선택적 부하 분산 및 검사점 지정을 사용하여 파티션에서 이벤트를 일괄 처리로 받습니다.
async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None
매개 변수
- on_event_batch
- Callable[PartitionContext, List[EventData]]
수신된 이벤트의 일괄 처리를 처리하기 위한 콜백 함수입니다. 콜백은 파티션 컨텍스트를 포함하는 partition_context 수신된 이벤트인 event_batch 두 매개 변수를 사용합니다. 콜백 함수는 다음과 같이 정의해야 합니다. on_event_batch(partition_context, event_batch). event_batch max_wait_time 없음 또는 0이 아니고 max_wait_time후에 이벤트가 수신되지 않는 경우 빈 목록이 될 수 있습니다. 자세한 파티션 컨텍스트 정보는 를 PartitionContext참조하세요.
- max_batch_size
- int
콜 백 on_event_batch 전달된 일괄 처리의 최대 이벤트 수입니다. 실제로 받은 이벤트 수가 max_batch_size보다 큰 경우 수신된 이벤트는 일괄 처리로 나뉘며 최대 max_batch_size 이벤트가 있는 각 일괄 처리에 대한 콜백을 호출합니다.
- max_wait_time
- float
콜백을 호출하기 전에 이벤트 프로세서가 대기하는 최대 간격(초)입니다. 이 간격 내에 이벤트가 수신되지 않으면 on_event_batch 콜백이 빈 목록으로 호출됩니다. 이 값을 None 또는 0(기본값)으로 설정하면 이벤트가 수신될 때까지 콜백이 호출되지 않습니다.
- partition_id
- str
지정된 경우 클라이언트는 이 파티션에서만 수신됩니다. 그렇지 않으면 클라이언트가 모든 파티션에서 수신됩니다.
- owner_level
- int
독점 소비자의 우선 순위입니다. owner_level 설정된 경우 단독 소비자가 만들어집니다. owner_level 높은 소비자는 배타적 우선 순위가 높습니다. 소유자 수준은 소비자의 'epoch 값'으로도 알고 있습니다.
- prefetch
- int
처리를 위해 서비스에서 프리페치할 이벤트 수입니다. 기본값은 300입니다.
- track_last_enqueued_event_properties
- bool
소비자가 연결된 파티션에서 마지막으로 큐에 추가된 이벤트에 대한 정보를 요청하고 이벤트가 수신될 때 해당 정보를 추적해야 하는지 여부를 나타냅니다. 마지막으로 큐에 추가된 파티션 이벤트에 대한 정보를 추적할 때 Event Hubs 서비스에서 받은 각 이벤트는 파티션에 대한 메타데이터를 전달합니다. 이로 인해 소량의 추가 네트워크 대역폭 소비가 발생하며, 이벤트 허브 클라이언트를 사용하여 파티션 속성에 대해 주기적으로 요청하는 것을 고려할 때 일반적으로 양호한 절차입니다. 기본적으로 False 로 설정됩니다.
파티션에 대한 검사점 데이터가 없는 경우 이 이벤트 위치에서 수신을 시작합니다. 검사점 데이터는 사용 가능한 경우 사용됩니다. 파티션 ID를 키로 사용하고 개별 파티션의 값으로 위치를 지정하는 받아쓰기 또는 모든 파티션에 대한 단일 값일 수 있습니다. 값 형식은 str, int 또는 datetime.datetime일 수 있습니다. 또한 스트림의 시작 부분에서 수신하기 위한 값 "-1"과 새 이벤트만 수신하기 위한 "@latest" 값도 지원됩니다.
지정된 starting_position 포함(>=)인지 여부(>)를 확인합니다. 포함의 경우 True이고, False이면 배타적입니다. 파티션 ID를 키로 사용하고 bool을 특정 파티션에 대한 starting_position 포함 여부를 나타내는 값으로 사용하는 받아쓰기일 수 있습니다. 이 값은 모든 starting_position 단일 부울 값일 수도 있습니다. 기본값은 False입니다.
- on_error
- Callable[[PartitionContext, Exception]]
재시도 시도가 소진된 후 또는 부하 분산 프로세스 중에 수신하는 동안 오류가 발생할 때 호출되는 콜백 함수입니다. 콜백은 두 개의 매개 변수를 사용합니다. partition_context 파티션 정보를 포함하고 오류가 예외입니다. 부하 분산 프로세스 중에 오류가 발생하면 partition_context 없음일 수 있습니다. 콜백은 다음과 같이 정의해야 합니다. on_error(partition_context, 오류). on_event 콜백 중에 처리되지 않은 예외가 발생하는 경우에도 on_error 콜백이 호출됩니다.
- on_partition_initialize
- Callable[[PartitionContext]]
특정 파티션에 대한 소비자가 초기화를 완료한 후 호출되는 콜백 함수입니다. 또한 실패하고 닫힌 내부 파티션 소비자에 대한 수신 프로세스를 인수하기 위해 새 내부 파티션 소비자를 만들 때 호출됩니다. 콜백은 단일 매개 변수인 partition_context 파티션 정보를 포함합니다. 콜백은 on_partition_initialize(partition_context)와 같이 정의해야 합니다.
- on_partition_close
- Callable[[PartitionContext, CloseReason]]
특정 파티션에 대한 소비자가 닫힌 후 호출될 콜백 함수입니다. 재시도를 모두 마친 후 수신하는 동안 오류가 발생할 때도 호출됩니다. 콜백은 파티션 정보와 종료 이유를 포함하는 partition_context 두 매개 변수를 사용합니다. 콜백은 on_partition_close(partition_context, 이유)와 같이 정의해야 합니다. 닫는 다양한 이유로 을 참조 CloseReason 하세요.
반환 형식
예제
EventHub에서 이벤트를 일괄 처리로 수신합니다.
logger = logging.getLogger("azure.eventhub")
async def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
logger.info(
"{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
)
async with consumer:
await consumer.receive_batch(
on_event_batch=on_event_batch,
starting_position="-1", # "-1" is from the beginning of the partition.
)
Azure SDK for Python