Azure Event Hubs Checkpoint Store client library for Java - version 1.20.1
using Storage Blobs
Azure Event Hubs Checkpoint Store can be used for storing checkpoints while processing events from Azure Event Hubs.
This package uses Storage Blobs as a persistent store for maintaining checkpoints and partition ownership information.
The BlobCheckpointStore
provided in this package can be plugged in to EventProcessor
.
Source code | API reference documentation | Product documentation | Samples
Getting started
Prerequisites
- A Java Development Kit (JDK), version 8 or later.
- Here are details about Java 8 client compatibility with Azure Certificate Authority.
- Maven
- Microsoft Azure subscription
- You can create a free account at: https://azure.microsoft.com
- Azure Event Hubs instance
- Step-by-step guide for creating an Event Hub using the Azure Portal
- Azure Storage account
- Step-by-step guide for creating a Storage account using the Azure Portal
Include the package
Include the BOM file
Please include the azure-sdk-bom to your project to take dependency on the General Availability (GA) version of the library. In the following snippet, replace the {bom_version_to_target} placeholder with the version number. To learn more about the BOM, see the AZURE SDK BOM README.
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-sdk-bom</artifactId>
<version>{bom_version_to_target}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
and then include the direct dependency in the dependencies section without the version tag as shown below.
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
</dependency>
</dependencies>
Include direct dependency
If you want to take dependency on a particular version of the library that is not present in the BOM, add the direct dependency to your project as follows.
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.20.1</version>
</dependency>
Authenticate the storage container client
In order to create an instance of BlobCheckpointStore
, a ContainerAsyncClient
should first be created with
appropriate SAS token with write access and connection string. To make this possible you'll need the Account SAS
(shared access signature) string of Storage account. Learn more at SAS Token.
Key concepts
Checkpointing
Checkpointing is a process by which readers mark or commit their position within a partition event sequence. Checkpointing is the responsibility of the consumer and occurs on a per-partition basis within a consumer group. This responsibility means that for each consumer group, each partition reader must keep track of its current position in the event stream, and can inform the service when it considers the data stream complete. If a reader disconnects from a partition, when it reconnects it begins reading at the checkpoint that was previously submitted by the last reader of that partition in that consumer group. When the reader connects, it passes the offset to the event hub to specify the location at which to start reading. In this way, you can use checkpointing to both mark events as "complete" by downstream applications, and to provide resiliency if a failover between readers running on different machines occurs. It is possible to return to older data by specifying a lower offset from this checkpointing process. Through this mechanism, checkpointing enables both failover resiliency and event stream replay.
Offsets & sequence numbers
Both offset & sequence number refer to the position of an event within a partition. You can think of them as a client-side cursor. The offset is a byte numbering of the event. The offset/sequence number enables an event consumer (reader) to specify a point in the event stream from which they want to begin reading events. You can specify the timestamp such that you receive events that were enqueued only after the given timestamp. Consumers are responsible for storing their own offset values outside the Event Hubs service. Within a partition, each event includes an offset, sequence number, and the timestamp of when it was enqueued.
Examples
- Create an instance of Storage Container client
- Create an instance using Azure Identity
- Consume events from all Event Hub partitions
- Specify storage version to create checkpoint store
Create an instance of Storage container with SAS token
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
.connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
.containerName("<CONTAINER_NAME>")
.sasToken("<SAS_TOKEN>")
.buildAsyncClient();
Consume events using an Event Processor Client
To consume events for all partitions of an Event Hub, you'll create an
EventProcessorClient
for a specific consumer group. When an Event Hub is created, it
provides a default consumer group that can be used to get started.
The EventProcessorClient
will delegate processing of events to a callback function that you
provide, allowing you to focus on the logic needed to provide value while the processor holds responsibility for
managing the underlying consumer operations.
In our example, we will focus on building the EventProcessor
, use the
BlobCheckpointStore
, and a simple callback function to process the events
received from the Event Hubs, writes to console and updates the checkpoint in Blob storage after each event.
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
.connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
.containerName("<CONTAINER_NAME>")
.sasToken("<SAS_TOKEN>")
.buildAsyncClient();
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.connectionString("<< EVENT HUB CONNECTION STRING >>")
.checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
.processEvent(eventContext -> {
System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
+ "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.println("Error occurred while processing events " + errorContext.getThrowable().getMessage());
})
.buildEventProcessorClient();
// This will start the processor. It will start processing events from all partitions.
eventProcessorClient.start();
// (for demo purposes only - adding sleep to wait for receiving events)
TimeUnit.SECONDS.sleep(2);
// When the user wishes to stop processing events, they can call `stop()`.
eventProcessorClient.stop();
Troubleshooting
Enable client logging
Azure SDK for Java offers a consistent logging story to help aid in troubleshooting application errors and expedite their resolution. The logs produced will capture the flow of an application before reaching the terminal state to help locate the root issue. View the logging wiki for guidance about enabling logging.
Default SSL library
All client libraries, by default, use the Tomcat-native Boring SSL library to enable native-level performance for SSL operations. The Boring SSL library is an uber jar containing native libraries for Linux / macOS / Windows, and provides better performance compared to the default SSL implementation within the JDK. For more information, including how to reduce the dependency size, refer to the performance tuning section of the wiki.
Next steps
Get started by exploring the samples here.
Contributing
If you would like to become an active contributor to this project please refer to our Contribution Guidelines for more information.