EventHubConsumerClient Class

The EventHubConsumerClient class defines a high level interface for receiving events from the Azure Event Hubs service.

The main goal of EventHubConsumerClient is to receive events from all partitions of an EventHub with load-balancing and checkpointing.

When multiple EventHubConsumerClient instances are running against the same event hub, consumer group and checkpointing location, the partitions will be evenly distributed among them.

To enable load-balancing and persisted checkpoints, checkpoint_store must be set when creating the EventHubConsumerClient. If a checkpoint store is not provided, the checkpoint will be maintained internally in memory.

An EventHubConsumerClient can also receive from a specific partition when you call its method receive() or receive_batch() and specify the partition_id. Load-balancing won't work in single-partition mode. But users can still save checkpoints if the checkpoint_store is set.

Inheritance
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubConsumerClient

Constructor

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

Parameters

Name Description
fully_qualified_namespace
Required
str

The fully qualified host name for the Event Hubs namespace. The namespace format is: .servicebus.windows.net.

eventhub_name
Required
str

The path of the specific Event Hub to connect the client to.

consumer_group
Required
str

Receive events from the event hub for this consumer group.

credential
Required

The credential object used for authentication which implements a particular interface for getting tokens. It accepts EventHubSharedKeyCredential, or credential objects generated by the azure-identity library and objects that implement the *get_token(self, scopes) method.

Keyword-Only Parameters

Name Description
logging_enable

Whether to output network trace logs to the logger. Default is False.

auth_timeout

The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.

user_agent
str

If specified, this will be added in front of the user agent string.

retry_total
int

The total number of attempts to redo a failed operation when an error occurs. Default value is 3. The context of retry_total in receiving is special: The receive method is implemented by a while-loop calling internal receive method in each iteration. In the receive case, retry_total specifies the numbers of retry after error raised by internal receive method in the while-loop. If retry attempts are exhausted, the on_error callback will be called (if provided) with the error information. The failed internal partition consumer will be closed (on_partition_close will be called if provided) and new internal partition consumer will be created (on_partition_initialize will be called if provided) to resume receiving.

retry_backoff_factor

A backoff factor to apply between attempts after the second try (most errors are resolved immediately by a second try without a delay). In fixed mode, retry policy will always sleep for {backoff factor}. In 'exponential' mode, retry policy will sleep for: {backoff factor} * (2 ** ({number of total retries} - 1)) seconds. If the backoff_factor is 0.1, then the retry will sleep for [0.0s, 0.2s, 0.4s, ...] between retries. The default value is 0.8.

retry_backoff_max

The maximum back off time. Default value is 120 seconds (2 minutes).

retry_mode
str

The delay behavior between retry attempts. Supported values are 'fixed' or 'exponential', where default is 'exponential'.

idle_timeout

Timeout, in seconds, after which this client will close the underlying connection if there is no further activity. By default the value is None, meaning that the client will not shutdown due to inactivity unless initiated by the service.

transport_type

The type of transport protocol that will be used for communicating with the Event Hubs service. Default is TransportType.Amqp in which case port 5671 is used. If the port 5671 is unavailable/blocked in the network environment, TransportType.AmqpOverWebsocket could be used instead which uses port 443 for communication.

http_proxy

HTTP proxy settings. This must be a dictionary with the following keys: 'proxy_hostname' (str value) and 'proxy_port' (int value).

checkpoint_store

A manager that stores the partition load-balancing and checkpoint data when receiving events. The checkpoint store will be used in both cases of receiving from all partitions or a single partition. In the latter case load-balancing does not apply. If a checkpoint store is not provided, the checkpoint will be maintained internally in memory, and the EventHubConsumerClient instance will receive events without load-balancing.

load_balancing_interval

When load-balancing kicks in. This is the interval, in seconds, between two load-balancing evaluations. Default is 30 seconds.

partition_ownership_expiration_interval

A partition ownership will expire after this number of seconds. Every load-balancing evaluation will automatically extend the ownership expiration time. Default is 6 * load_balancing_interval, i.e. 180 seconds when using the default load_balancing_interval of 30 seconds.

load_balancing_strategy

When load-balancing kicks in, it will use this strategy to claim and balance the partition ownership. Use "greedy" or LoadBalancingStrategy.GREEDY for the greedy strategy, which, for every load-balancing evaluation, will grab as many unclaimed partitions required to balance the load. Use "balanced" or LoadBalancingStrategy.BALANCED for the balanced strategy, which, for every load-balancing evaluation, claims only one partition that is not claimed by other EventHubConsumerClient. If all partitions of an EventHub are claimed by other EventHubConsumerClient and this client has claimed too few partitions, this client will steal one partition from other clients for every load-balancing evaluation regardless of the load balancing strategy. Greedy strategy is used by default.

custom_endpoint_address

The custom endpoint address to use for establishing a connection to the Event Hubs service, allowing network requests to be routed through any application gateways or other paths needed for the host environment. Default is None. The format would be like "sb://<custom_endpoint_hostname>:<custom_endpoint_port>". If port is not specified in the custom_endpoint_address, by default port 443 will be used.

connection_verify

Path to the custom CA_BUNDLE file of the SSL certificate which is used to authenticate the identity of the connection endpoint. Default is None in which case certifi.where() will be used.

ssl_context

The SSLContext object to use in the underlying Pure Python AMQP transport. If specified, connection_verify will be ignored.

uamqp_transport

Whether to use the uamqp library as the underlying transport. The default value is False and the Pure Python AMQP library will be used as the underlying transport.

socket_timeout

The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If EventHubsConnectionError errors are occurring due to write timing out, a larger than default value may need to be passed in. This is for advanced usage scenarios and ordinarily the default value should be sufficient.

Examples

Create a new instance of the EventHubConsumerClient.


   import os
   from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ["EVENT_HUB_HOSTNAME"]
   eventhub_name = os.environ["EVENT_HUB_NAME"]
   shared_access_policy = os.environ["EVENT_HUB_SAS_POLICY"]
   shared_access_key = os.environ["EVENT_HUB_SAS_KEY"]

   consumer = EventHubConsumerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       consumer_group="$Default",
       eventhub_name=eventhub_name,
       credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key),
   )

Methods

close

Stop retrieving events from the Event Hub and close the underlying AMQP connection and links.

from_connection_string

Create an EventHubConsumerClient from a connection string.

get_eventhub_properties

Get properties of the Event Hub.

Keys in the returned dictionary include:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Get partition IDs of the Event Hub.

get_partition_properties

Get properties of the specified partition.

Keys in the properties dictionary include:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

Receive events from partition(s), with optional load-balancing and checkpointing.

receive_batch

Receive events from partition(s) in batches, with optional load-balancing and checkpointing.

close

Stop retrieving events from the Event Hub and close the underlying AMQP connection and links.

async close() -> None

Returns

Type Description

Examples

Close down the client.


   import os
   from azure.identity.aio import DefaultAzureCredential

   fully_qualified_namespace = os.environ["EVENT_HUB_HOSTNAME"]
   eventhub_name = os.environ["EVENT_HUB_NAME"]

   from azure.eventhub.aio import EventHubConsumerClient

   consumer = EventHubConsumerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       consumer_group="$Default",
       eventhub_name=eventhub_name,  # EventHub name should be specified if it doesn't show up in connection string.
       credential=DefaultAzureCredential(),
   )

   logger = logging.getLogger("azure.eventhub")

   async def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, async will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The receive method is a coroutine which will be blocking when awaited.
   # It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.

   recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
   await asyncio.sleep(3)  # keep receiving for 3 seconds
   recv_task.cancel()  # stop receiving

   # Close down the consumer handler explicitly.
   await consumer.close()

from_connection_string

Create an EventHubConsumerClient from a connection string.

from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, retry_backoff_factor: float = 0.8, retry_backoff_max: float = 120, retry_mode: Literal['exponential', 'fixed'] = 'exponential', idle_timeout: float | None = None, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, partition_ownership_expiration_interval: float | None = None, load_balancing_strategy: str | LoadBalancingStrategy = LoadBalancingStrategy.GREEDY, custom_endpoint_address: str | None = None, connection_verify: str | None = None, ssl_context: 'SSLContext' | None = None, uamqp_transport: bool = False, **kwargs: Any) -> EventHubConsumerClient

Parameters

Name Description
conn_str
Required
str

The connection string of an Event Hub.

consumer_group
Required
str

Receive events from the Event Hub for this consumer group.

Keyword-Only Parameters

Name Description
eventhub_name
str

The path of the specific Event Hub to connect the client to.

logging_enable

Whether to output network trace logs to the logger. Default is False.

http_proxy

HTTP proxy settings. This must be a dictionary with the following keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). Additionally the following keys may also be present: 'username', 'password'.

auth_timeout

The time in seconds to wait for a token to be authorized by the service. The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.

Default value: 60
user_agent
str

If specified, this will be added in front of the user agent string.

retry_total
int

The total number of attempts to redo a failed operation when an error occurs. Default value is 3. The context of retry_total in receiving is special: The receive method is implemented by a while-loop calling internal receive method in each iteration. In the receive case, retry_total specifies the numbers of retry after error raised by internal receive method in the while-loop. If retry attempts are exhausted, the on_error callback will be called (if provided) with the error information. The failed internal partition consumer will be closed (on_partition_close will be called if provided) and new internal partition consumer will be created (on_partition_initialize will be called if provided) to resume receiving.

Default value: 3
retry_backoff_factor

A backoff factor to apply between attempts after the second try (most errors are resolved immediately by a second try without a delay). In fixed mode, retry policy will always sleep for {backoff factor}. In 'exponential' mode, retry policy will sleep for: {backoff factor} * (2 ** ({number of total retries} - 1)) seconds. If the backoff_factor is 0.1, then the retry will sleep for [0.0s, 0.2s, 0.4s, ...] between retries. The default value is 0.8.

Default value: 0.8
retry_backoff_max

The maximum back off time. Default value is 120 seconds (2 minutes).

Default value: 120
retry_mode
str

The delay behavior between retry attempts. Supported values are 'fixed' or 'exponential', where default is 'exponential'.

Default value: exponential
idle_timeout

Timeout, in seconds, after which this client will close the underlying connection if there is no further activity. By default the value is None, meaning that the client will not shutdown due to inactivity unless initiated by the service.

transport_type

The type of transport protocol that will be used for communicating with the Event Hubs service. Default is TransportType.Amqp in which case port 5671 is used. If the port 5671 is unavailable/blocked in the network environment, TransportType.AmqpOverWebsocket could be used instead which uses port 443 for communication.

Default value: TransportType.Amqp
checkpoint_store

A manager that stores the partition load-balancing and checkpoint data when receiving events. The checkpoint store will be used in both cases of receiving from all partitions or a single partition. In the latter case load-balancing does not apply. If a checkpoint store is not provided, the checkpoint will be maintained internally in memory, and the EventHubConsumerClient instance will receive events without load-balancing.

load_balancing_interval

When load-balancing kicks in. This is the interval, in seconds, between two load-balancing evaluations. Default is 30 seconds.

Default value: 30
partition_ownership_expiration_interval

A partition ownership will expire after this number of seconds. Every load-balancing evaluation will automatically extend the ownership expiration time. Default is 6 * load_balancing_interval, i.e. 180 seconds when using the default load_balancing_interval of 30 seconds.

load_balancing_strategy

When load-balancing kicks in, it will use this strategy to claim and balance the partition ownership. Use "greedy" or LoadBalancingStrategy.GREEDY for the greedy strategy, which, for every load-balancing evaluation, will grab as many unclaimed partitions required to balance the load. Use "balanced" or LoadBalancingStrategy.BALANCED for the balanced strategy, which, for every load-balancing evaluation, claims only one partition that is not claimed by other EventHubConsumerClient. If all partitions of an EventHub are claimed by other EventHubConsumerClient and this client has claimed too few partitions, this client will steal one partition from other clients for every load-balancing evaluation regardless of the load balancing strategy. Greedy strategy is used by default.

Default value: LoadBalancingStrategy.GREEDY
custom_endpoint_address

The custom endpoint address to use for establishing a connection to the Event Hubs service, allowing network requests to be routed through any application gateways or other paths needed for the host environment. Default is None. The format would be like "sb://<custom_endpoint_hostname>:<custom_endpoint_port>". If port is not specified in the custom_endpoint_address, by default port 443 will be used.

connection_verify

Path to the custom CA_BUNDLE file of the SSL certificate which is used to authenticate the identity of the connection endpoint. Default is None in which case certifi.where() will be used.

ssl_context

The SSLContext object to use in the underlying Pure Python AMQP transport. If specified, connection_verify will be ignored.

uamqp_transport

Whether to use the uamqp library as the underlying transport. The default value is False and the Pure Python AMQP library will be used as the underlying transport.

Returns

Type Description

An EventHubConsumerClient instance.

Examples

Create a new instance of the EventHubConsumerClient from connection string.


   import os
   from azure.eventhub.aio import EventHubConsumerClient
   from azure.identity.aio import DefaultAzureCredential

   fully_qualified_namespace = os.environ["EVENT_HUB_HOSTNAME"]
   eventhub_name = os.environ["EVENT_HUB_NAME"]
   consumer = EventHubConsumerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       credential=DefaultAzureCredential(),
       consumer_group="$Default",
       eventhub_name=eventhub_name,  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

Get properties of the Event Hub.

Keys in the returned dictionary include:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

Returns

Type Description

A dictionary containing information about the Event Hub.

Exceptions

Type Description

get_partition_ids

Get partition IDs of the Event Hub.

async get_partition_ids() -> List[str]

Returns

Type Description

A list of partition IDs.

Exceptions

Type Description

get_partition_properties

Get properties of the specified partition.

Keys in the properties dictionary include:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Parameters

Name Description
partition_id
Required
str

The target partition ID.

Returns

Type Description

A dictionary containing partition properties.

Exceptions

Type Description

receive

Receive events from partition(s), with optional load-balancing and checkpointing.

async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parameters

Name Description
on_event
Required

The callback function for handling a received event. The callback takes two parameters: partition_context which contains partition context and event which is the received event. The callback function should be defined like: on_event(partition_context, event). For detailed partition context information, please refer to PartitionContext.

Keyword-Only Parameters

Name Description
max_wait_time

The maximum interval in seconds that the event processor will wait before calling the callback. If no events are received within this interval, the on_event callback will be called with None. If this value is set to None or 0 (the default), the callback will not be called until an event is received.

partition_id
str

If specified, the client will receive from this partition only. Otherwise the client will receive from all partitions.

owner_level
int

The priority for an exclusive consumer. An exclusive consumer will be created if owner_level is set. A consumer with a higher owner_level has higher exclusive priority. The owner level is also know as the 'epoch value' of the consumer.

prefetch
int

The number of events to prefetch from the service for processing. Default is 300.

Default value: 300
track_last_enqueued_event_properties

Indicates whether the consumer should request information on the last-enqueued event on its associated partition, and track that information as events are received. When information about the partitions last-enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client. It is set to False by default.

starting_position

Start receiving from this event position if there is no checkpoint data for a partition. Checkpoint data will be used if available. This can be a a dict with partition ID as the key and position as the value for individual partitions, or a single value for all partitions. The value type can be str, int or datetime.datetime. Also supported are the values "-1" for receiving from the beginning of the stream, and "@latest" for receiving only new events.

starting_position_inclusive

Determine whether the given starting_position is inclusive(>=) or not (>). True for inclusive and False for exclusive. This can be a dict with partition ID as the key and bool as the value indicating whether the starting_position for a specific partition is inclusive or not. This can also be a single bool value for all starting_position. The default value is False.

on_error

The callback function that will be called when an error is raised during receiving after retry attempts are exhausted, or during the process of load-balancing. The callback takes two parameters: partition_context which contains partition information and error being the exception. partition_context could be None if the error is raised during the process of load-balance. The callback should be defined like: on_error(partition_context, error). The on_error callback will also be called if an unhandled exception is raised during the on_event callback.

on_partition_initialize

The callback function that will be called after a consumer for a certain partition finishes initialization. It would also be called when a new internal partition consumer is created to take over the receiving process for a failed and closed internal partition consumer. The callback takes a single parameter: partition_context which contains the partition information. The callback should be defined like: on_partition_initialize(partition_context).

on_partition_close

The callback function that will be called after a consumer for a certain partition is closed. It would be also called when error is raised during receiving after retry attempts are exhausted. The callback takes two parameters: partition_context which contains partition information and reason for the close. The callback should be defined like: on_partition_close(partition_context, reason). Please refer to CloseReason for the various closing reasons.

Returns

Type Description

Examples

Receive events from the EventHub.


       logger = logging.getLogger("azure.eventhub")

       async def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       async with consumer:
           await consumer.receive(
               on_event=on_event,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )

receive_batch

Receive events from partition(s) in batches, with optional load-balancing and checkpointing.

async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parameters

Name Description
on_event_batch
Required

The callback function for handling a batch of received events. The callback takes two parameters: partition_context which contains partition context and event_batch, which is the received events. The callback function should be defined like: on_event_batch(partition_context, event_batch). event_batch could be an empty list if max_wait_time is not None nor 0 and no event is received after max_wait_time. For detailed partition context information, please refer to PartitionContext.

Keyword-Only Parameters

Name Description
max_batch_size
int

The maximum number of events in a batch passed to callback on_event_batch. If the actual received number of events is larger than max_batch_size, the received events are divided into batches and call the callback for each batch with up to max_batch_size events.

Default value: 300
max_wait_time

The maximum interval in seconds that the event processor will wait before calling the callback. If no events are received within this interval, the on_event_batch callback will be called with an empty list. If this value is set to None or 0 (the default), the callback will not be called until events are received.

Default value: 300
partition_id
str

If specified, the client will receive from this partition only. Otherwise the client will receive from all partitions.

Default value: 300
owner_level
int

The priority for an exclusive consumer. An exclusive consumer will be created if owner_level is set. A consumer with a higher owner_level has higher exclusive priority. The owner level is also know as the 'epoch value' of the consumer.

Default value: 300
prefetch
int

The number of events to prefetch from the service for processing. Default is 300.

Default value: 300
track_last_enqueued_event_properties

Indicates whether the consumer should request information on the last-enqueued event on its associated partition, and track that information as events are received. When information about the partitions last-enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client. It is set to False by default.

Default value: 300
starting_position

Start receiving from this event position if there is no checkpoint data for a partition. Checkpoint data will be used if available. This can be a a dict with partition ID as the key and position as the value for individual partitions, or a single value for all partitions. The value type can be str, int or datetime.datetime. Also supported are the values "-1" for receiving from the beginning of the stream, and "@latest" for receiving only new events.

Default value: 300
starting_position_inclusive

Determine whether the given starting_position is inclusive(>=) or not (>). True for inclusive and False for exclusive. This can be a dict with partition ID as the key and bool as the value indicating whether the starting_position for a specific partition is inclusive or not. This can also be a single bool value for all starting_position. The default value is False.

Default value: 300
on_error

The callback function that will be called when an error is raised during receiving after retry attempts are exhausted, or during the process of load-balancing. The callback takes two parameters: partition_context which contains partition information and error being the exception. partition_context could be None if the error is raised during the process of load-balance. The callback should be defined like: on_error(partition_context, error). The on_error callback will also be called if an unhandled exception is raised during the on_event callback.

Default value: 300
on_partition_initialize

The callback function that will be called after a consumer for a certain partition finishes initialization. It would also be called when a new internal partition consumer is created to take over the receiving process for a failed and closed internal partition consumer. The callback takes a single parameter: partition_context which contains the partition information. The callback should be defined like: on_partition_initialize(partition_context).

Default value: 300
on_partition_close

The callback function that will be called after a consumer for a certain partition is closed. It would be also called when error is raised during receiving after retry attempts are exhausted. The callback takes two parameters: partition_context which contains partition information and reason for the close. The callback should be defined like: on_partition_close(partition_context, reason). Please refer to CloseReason for the various closing reasons.

Default value: 300

Returns

Type Description

Examples

Receive events in batches from the EventHub.


       logger = logging.getLogger("azure.eventhub")

       async def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info(
               "{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
           )

       async with consumer:
           await consumer.receive_batch(
               on_event_batch=on_event_batch,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )