适用于 Java 的 Azure 事件中心 Checkpoint Store 客户端库 - 版本 1.0.0-beta.2

使用用于 Redis 的 Jedis 客户端库

Azure 事件中心检查点存储可用于在处理来自Azure 事件中心的事件时存储检查点。 此包使用 Redis 作为持久存储来维护检查点和分区所有权信息。 JedisRedisCheckpointStore此包中提供的 可以插入 EventProcessorClient

源代码| API 参考文档 | 产品文档 | 示例

入门

先决条件

添加包

包括 BOM 文件

请将 azure-sdk-bom 包含在项目中,以依赖于库的正式发布 (GA) 版本。 在以下代码段中,将 {bom_version_to_target} 占位符替换为版本号。 若要详细了解 BOM,请参阅 AZURE SDK BOM 自述文件

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-sdk-bom</artifactId>
            <version>{bom_version_to_target}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

然后在 dependencies 节中包含直接依赖项,不带版本标记,如下所示。

<dependencies>
  <dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs-checkpointstore-jedis</artifactId>
  </dependency>
</dependencies>

包括直接依赖项

如果要依赖于 BOM 中不存在的特定版本的库,请将直接依赖项添加到项目中,如下所示。

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs-checkpointstore-jedis</artifactId>
    <version>1.0.0-beta.2</version>
</dependency>

对存储容器客户端进行身份验证

若要创建 实例 JedisCheckpointStoreJedisPool 必须创建 对象。 若要生成此 JedisPool 对象,需要主机名 String 和主键 String。 这些可用于创建 JedisPool 对象,如下所示。

关键概念

此处详细介绍了关键概念。

示例

创建 JedisPool 的实例

若要使用 Azure Redis 缓存创建 JedisPool 实例,请按照在 Java 中使用 Azure Cache for Redis 中的说明提取主机名和访问密钥。 否则,请使用正在运行的 Redis 实例中的连接信息。

JedisClientConfig clientConfig = DefaultJedisClientConfig.builder()
    .password("<YOUR_REDIS_PRIMARY_ACCESS_KEY>")
    .ssl(true)
    .build();

String redisHostName = "<YOUR_REDIS_HOST_NAME>.redis.cache.windows.net";
HostAndPort hostAndPort = new HostAndPort(redisHostName, 6380);
JedisPool jedisPool = new JedisPool(hostAndPort, clientConfig);

// Do things with JedisPool.

// Finally, dispose of resource
jedisPool.close();

使用事件处理程序客户端使用事件

若要使用事件中心的所有分区的事件,需要为特定的使用者组创建 EventProcessorClient 。 创建事件中心时,它提供可用于入门的默认使用者组。

EventProcessorClient会将事件的处理委托给你提供的回调函数,使你能够专注于提供值所需的逻辑,而处理器负责管理基础使用者操作。

在本示例中,我们将重点介绍如何生成 EventProcessor、使用 JedisRedisCheckpointStore和简单的回调函数来处理从事件中心接收的事件、写入控制台并在每个事件后更新 Blob 存储中的检查点。

JedisClientConfig clientConfig = DefaultJedisClientConfig.builder()
    .password("<YOUR_REDIS_PRIMARY_ACCESS_KEY>")
    .ssl(true)
    .build();

String redisHostName = "<YOUR_REDIS_HOST_NAME>.redis.cache.windows.net";
HostAndPort hostAndPort = new HostAndPort(redisHostName, 6380);
JedisPool jedisPool = new JedisPool(hostAndPort, clientConfig);

EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
    .consumerGroup("<< CONSUMER GROUP NAME >>")
    .connectionString("<< EVENT HUB NAMESPACE CONNECTION STRING >>")
    .eventHubName("<< EVENT HUB NAME >>")
    .checkpointStore(new JedisCheckpointStore(jedisPool))
    .processEvent(eventContext -> {
        System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
            + "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
    })
    .processError(context -> {
        System.out.println("Error occurred while processing events " + context.getThrowable().getMessage());
    })
    .buildEventProcessorClient();

// This will start the processor. It will start processing events from all partitions.
eventProcessorClient.start();

// (for demo purposes only - adding sleep to wait for receiving events)
// Your application will probably keep the eventProcessorClient alive until the program ends.
TimeUnit.SECONDS.sleep(2);

// When the user wishes to stop processing events, they can call `stop()`.
eventProcessorClient.stop();

// Dispose of JedisPool resource.
jedisPool.close();

故障排除

启用客户端日志记录

Azure SDK for Java 提供一致的日志记录案例,以帮助排查应用程序错误并加快其解决速度。 生成的日志会在到达终端状态之前捕获应用程序的流,以帮助查找根本问题。 查看 日志记录 Wiki,获取有关启用日志记录的指南。

后续步骤

首先浏览 此处的示例。

贡献

如果你想成为此项目的活动参与者,请参阅我们的贡献指南了解详细信息。