Azure Event Hubs biblioteca cliente del Almacén de puntos de control para Java: versión 1.17.0
uso de blobs de almacenamiento
Azure Event Hubs Almacén de puntos de control se puede usar para almacenar puntos de control mientras se procesan eventos de Azure Event Hubs.
Este paquete usa blobs de almacenamiento como almacén persistente para mantener los puntos de control y la información de propiedad de la partición.
El BlobCheckpointStore
proporcionado en este paquete se puede conectar a EventProcessor
.
Código fuente | Documentación de referencia de API | Documentación del producto | Ejemplos
Introducción
Requisitos previos
- Un kit de desarrollo de Java (JDK), versión 8 o posterior.
- Maven
- Suscripción de Microsoft Azure
- Puede crear una cuenta gratuita en: https://azure.microsoft.com
- instancia de Azure Event Hubs
- Guía paso a paso para crear un centro de eventos mediante Azure Portal
- Cuenta de Azure Storage
- Guía paso a paso para crear una cuenta de Storage mediante Azure Portal
Inclusión del paquete
Inclusión del archivo BOM
Incluya azure-sdk-bom en el proyecto para depender de la versión de disponibilidad general (GA) de la biblioteca. En el fragmento de código siguiente, reemplace el marcador de posición {bom_version_to_target} por el número de versión. Para más información sobre la lista de materiales, consulte el archivo Léame bom del SDK de AZURE.
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-sdk-bom</artifactId>
<version>{bom_version_to_target}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
y, a continuación, incluya la dependencia directa en la sección de dependencias sin la etiqueta de versión, como se muestra a continuación.
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
</dependency>
</dependencies>
Inclusión de dependencias directas
Si desea depender de una versión determinada de la biblioteca que no está presente en la lista de materiales, agregue la dependencia directa al proyecto como se indica a continuación.
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.17.0</version>
</dependency>
Autenticación del cliente de contenedor de almacenamiento
Para crear una instancia de BlobCheckpointStore
, primero se debe crear un ContainerAsyncClient
con el token de SAS adecuado con acceso de escritura y cadena de conexión. Para que esto sea posible, necesitará la cadena saS de cuenta (firma de acceso compartido) de la cuenta de almacenamiento. Obtenga más información en Token de SAS.
Conceptos clave
Puntos de control
Puntos de control es un proceso en el que los lectores marcan o confirman su posición dentro de la secuencia de eventos de una partición. La creación de puntos de comprobación es responsabilidad del consumidor y se realiza por partición dentro de un grupo de consumidores. Esta responsaibilidad significa que por cada grupo de consumidores, cada lector de la partición debe realizar un seguimiento de su posición actual en el flujo del evento y puede informar al servicio cuando considere que el flujo de datos se ha completado. Si se desconecta un lector de una partición, cuando se vuelve a conectar comienza a leer en el punto de comprobación que envió previamente el último lector de esa partición en ese grupo de consumidores. Cuando se conecta el lector, pasa este desplazamiento al centro de eventos para especificar la ubicación en la que se va a empezar a leer. De este modo, puede usar puntos de comprobación para marcar eventos como "completados" por las aplicaciones de bajada y para ofrecer resistencia en caso de que se produzca una conmutación por error entre lectores que se ejecutan en máquinas distintas. Es posible volver a los datos más antiguos especificando un desplazamiento inferior desde este proceso de puntos de comprobación. Mediante este mecanismo, los puntos de comprobación permiten una resistencia a la conmutación por error y una reproducción del flujo de eventos.
Desplazamientos de números de & secuencia
Ambos números de secuencia de desplazamiento & hacen referencia a la posición de un evento dentro de una partición. Puede considerarlos como un cursor del lado cliente. El desplazamiento es una numeración de byte del evento. El número de desplazamiento o secuencia permite a un consumidor de eventos (lector) especificar un punto en el flujo de eventos desde el que desean empezar a leer eventos. Puede especificar la marca de tiempo de modo que reciba eventos que se poneron en cola solo después de la marca de tiempo especificada. Los consumidores son responsables de almacenar sus propios valores de desplazamiento fuera del servicio Event Hubs. Dentro de una partición, cada evento incluye un desplazamiento, un número de secuencia y la marca de tiempo de cuando se puso en cola.
Ejemplos
- Creación de una instancia del cliente de contenedor de almacenamiento
- Creación de una instancia mediante Azure Identity
- Consumo de eventos de todas las particiones de Event Hubs
- Especificación de la versión de almacenamiento para crear el almacén de puntos de control
Creación de una instancia del contenedor de Storage con el token de SAS
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
.connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
.containerName("<CONTAINER_NAME>")
.sasToken("<SAS_TOKEN>")
.buildAsyncClient();
Consumo de eventos mediante un cliente de procesador de eventos
Para consumir eventos para todas las particiones de un centro de eventos, creará un EventProcessorClient
para un grupo de consumidores específico. Cuando se crea un centro de eventos, proporciona un grupo de consumidores predeterminado que se puede usar para empezar.
EventProcessorClient
delegará el procesamiento de eventos en una función de devolución de llamada que proporcione, lo que le permite centrarse en la lógica necesaria para proporcionar valor, mientras que el procesador es responsable de administrar las operaciones de consumidor subyacentes.
En nuestro ejemplo, nos centraremos en compilar EventProcessor
, usaremos y BlobCheckpointStore
una función de devolución de llamada simple para procesar los eventos recibidos de Event Hubs, escribiremos en la consola y actualizaremos el punto de control en Blob Storage después de cada evento.
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
.connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
.containerName("<CONTAINER_NAME>")
.sasToken("<SAS_TOKEN>")
.buildAsyncClient();
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.connectionString("<< EVENT HUB CONNECTION STRING >>")
.checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
.processEvent(eventContext -> {
System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
+ "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.println("Error occurred while processing events " + errorContext.getThrowable().getMessage());
})
.buildEventProcessorClient();
// This will start the processor. It will start processing events from all partitions.
eventProcessorClient.start();
// (for demo purposes only - adding sleep to wait for receiving events)
TimeUnit.SECONDS.sleep(2);
// When the user wishes to stop processing events, they can call `stop()`.
eventProcessorClient.stop();
Solución de problemas
Habilitación del registro de cliente
El SDK de Azure para Java ofrece un artículo de registro coherente para ayudar a solucionar errores de aplicación y acelerar su resolución. Los registros generados capturarán el flujo de una aplicación antes de alcanzar el estado terminal para ayudar a encontrar el problema raíz. Consulte la wiki de registro para obtener instrucciones sobre cómo habilitar el registro.
Biblioteca SSL predeterminada
De forma predeterminada, todas las bibliotecas cliente usan la biblioteca Boring SSL nativa de Tomcat para habilitar el rendimiento de nivel nativo para las operaciones SSL. La biblioteca Boring SSL es un archivo uber-jar que contiene bibliotecas nativas para Linux, macOS o Windows, que proporciona un mejor rendimiento en comparación con la implementación SSL predeterminada del JDK. Para obtener más información, incluido cómo reducir el tamaño de las dependencias, consulte la sección optimización del rendimiento de la wiki.
Pasos siguientes
Para empezar, explore los ejemplos aquí.
Contribuciones
Si desea convertirse en colaborador activo de este proyecto, consulte nuestras Directrices de contribución para obtener más información.