Azure Event Hubs biblioteca cliente del Almacén de puntos de comprobación para Java: versión 1.0.0-beta.2
uso de la biblioteca cliente de Jedis para Redis
Azure Event Hubs Almacén de puntos de control se puede usar para almacenar puntos de control mientras se procesan eventos de Azure Event Hubs.
Este paquete usa Redis como almacén persistente para mantener los puntos de control y la información de propiedad de las particiones.
El JedisRedisCheckpointStore
proporcionado en este paquete se puede conectar a EventProcessorClient
.
Código fuente| Documentación de referencia de API | Documentación del producto | Ejemplos
Introducción
Requisitos previos
- Un kit de desarrollo de Java (JDK), versión 8 o posterior.
- Maven
- Suscripción de Microsoft Azure
- Puede crear una cuenta gratuita en: https://azure.microsoft.com
- instancia de Azure Event Hubs
- Guía paso a paso para crear un centro de eventos mediante Azure Portal
- Azure Redis Cache o un servidor de Redis alternativo adecuado
- Guía paso a paso para crear una instancia de Redis Cache mediante Azure Portal
Inclusión del paquete
Inclusión del archivo BOM
Incluya azure-sdk-bom en el proyecto para depender de la versión de disponibilidad general (GA) de la biblioteca. En el fragmento de código siguiente, reemplace el marcador de posición {bom_version_to_target} por el número de versión. Para más información sobre la lista de materiales, consulte EL ARCHIVO LÉAME BOM del SDK de AZURE.
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-sdk-bom</artifactId>
<version>{bom_version_to_target}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
y, a continuación, incluya la dependencia directa en la sección dependencias sin la etiqueta de versión, como se muestra a continuación.
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-jedis</artifactId>
</dependency>
</dependencies>
Inclusión de dependencias directas
Si quiere depender de una versión determinada de la biblioteca que no está presente en la lista de materiales, agregue la dependencia directa al proyecto como se indica a continuación.
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-jedis</artifactId>
<version>1.0.0-beta.2</version>
</dependency>
Autenticación del cliente de contenedor de almacenamiento
Para crear una instancia de JedisCheckpointStore
, se debe crear un JedisPool
objeto . Para que este JedisPool
objeto sea necesario un nombre de host String y una cadena de clave principal. Se pueden usar como se muestra a continuación para crear un JedisPool
objeto .
Conceptos clave
Aquí se explican los conceptos clave.
Ejemplos
- Creación y ejecución de una instancia de JedisRedisCheckpointStore
- Consumo de eventos de todas las particiones de Event Hubs
Creación de una instancia de JedisPool
Para crear una instancia de JedisPool mediante Azure Redis Cache, siga las instrucciones de Uso de Azure Cache for Redis en Java para capturar el nombre de host y la clave de acceso. De lo contrario, use la información de conexión de una instancia de Redis en ejecución.
JedisClientConfig clientConfig = DefaultJedisClientConfig.builder()
.password("<YOUR_REDIS_PRIMARY_ACCESS_KEY>")
.ssl(true)
.build();
String redisHostName = "<YOUR_REDIS_HOST_NAME>.redis.cache.windows.net";
HostAndPort hostAndPort = new HostAndPort(redisHostName, 6380);
JedisPool jedisPool = new JedisPool(hostAndPort, clientConfig);
// Do things with JedisPool.
// Finally, dispose of resource
jedisPool.close();
Consumo de eventos mediante un cliente de procesador de eventos
Para consumir eventos para todas las particiones de un centro de eventos, creará un EventProcessorClient
para un grupo de consumidores específico. Cuando se crea un centro de eventos, proporciona un grupo de consumidores predeterminado que se puede usar para empezar.
EventProcessorClient
delegará el procesamiento de eventos en una función de devolución de llamada que proporcione, lo que le permitirá centrarse en la lógica necesaria para proporcionar valor, mientras que el procesador tiene la responsabilidad de administrar las operaciones de consumidor subyacentes.
En nuestro ejemplo, nos centraremos en compilar , usaremos y JedisRedisCheckpointStore
una función de devolución de llamada sencilla para procesar los eventos recibidos de Event Hubs, escribiremos en la EventProcessor
consola y actualizaremos el punto de control en Blob Storage después de cada evento.
JedisClientConfig clientConfig = DefaultJedisClientConfig.builder()
.password("<YOUR_REDIS_PRIMARY_ACCESS_KEY>")
.ssl(true)
.build();
String redisHostName = "<YOUR_REDIS_HOST_NAME>.redis.cache.windows.net";
HostAndPort hostAndPort = new HostAndPort(redisHostName, 6380);
JedisPool jedisPool = new JedisPool(hostAndPort, clientConfig);
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.connectionString("<< EVENT HUB NAMESPACE CONNECTION STRING >>")
.eventHubName("<< EVENT HUB NAME >>")
.checkpointStore(new JedisCheckpointStore(jedisPool))
.processEvent(eventContext -> {
System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
+ "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
})
.processError(context -> {
System.out.println("Error occurred while processing events " + context.getThrowable().getMessage());
})
.buildEventProcessorClient();
// This will start the processor. It will start processing events from all partitions.
eventProcessorClient.start();
// (for demo purposes only - adding sleep to wait for receiving events)
// Your application will probably keep the eventProcessorClient alive until the program ends.
TimeUnit.SECONDS.sleep(2);
// When the user wishes to stop processing events, they can call `stop()`.
eventProcessorClient.stop();
// Dispose of JedisPool resource.
jedisPool.close();
Solución de problemas
Habilitación del registro de cliente
Azure SDK para Java ofrece un artículo de registro coherente para ayudar a solucionar errores de aplicación y acelerar su resolución. Los registros generados capturarán el flujo de una aplicación antes de alcanzar el estado terminal para ayudar a encontrar el problema raíz. Vea la wiki de registro para obtener instrucciones sobre cómo habilitar el registro.
Pasos siguientes
Para empezar, explore los ejemplos aquí.
Contribuciones
Si desea convertirse en colaborador activo de este proyecto, consulte nuestras Directrices de contribución para obtener más información.
Azure SDK for Java