Compartir a través de


Azure Event Hubs biblioteca cliente del Almacén de puntos de control para Java: versión 1.17.0

uso de blobs de almacenamiento

Azure Event Hubs Almacén de puntos de control se puede usar para almacenar puntos de control mientras se procesan eventos de Azure Event Hubs. Este paquete usa blobs de almacenamiento como almacén persistente para mantener los puntos de control y la información de propiedad de la partición. El BlobCheckpointStore proporcionado en este paquete se puede conectar a EventProcessor.

Código fuente | Documentación de referencia de API | Documentación del producto | Ejemplos

Introducción

Requisitos previos

Inclusión del paquete

Inclusión del archivo BOM

Incluya azure-sdk-bom en el proyecto para depender de la versión de disponibilidad general (GA) de la biblioteca. En el fragmento de código siguiente, reemplace el marcador de posición {bom_version_to_target} por el número de versión. Para más información sobre la lista de materiales, consulte el archivo Léame bom del SDK de AZURE.

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-sdk-bom</artifactId>
            <version>{bom_version_to_target}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

y, a continuación, incluya la dependencia directa en la sección de dependencias sin la etiqueta de versión, como se muestra a continuación.

<dependencies>
  <dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
  </dependency>
</dependencies>

Inclusión de dependencias directas

Si desea depender de una versión determinada de la biblioteca que no está presente en la lista de materiales, agregue la dependencia directa al proyecto como se indica a continuación.

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
    <version>1.17.0</version>
</dependency>

Autenticación del cliente de contenedor de almacenamiento

Para crear una instancia de BlobCheckpointStore, primero se debe crear un ContainerAsyncClient con el token de SAS adecuado con acceso de escritura y cadena de conexión. Para que esto sea posible, necesitará la cadena saS de cuenta (firma de acceso compartido) de la cuenta de almacenamiento. Obtenga más información en Token de SAS.

Conceptos clave

Puntos de control

Puntos de control es un proceso en el que los lectores marcan o confirman su posición dentro de la secuencia de eventos de una partición. La creación de puntos de comprobación es responsabilidad del consumidor y se realiza por partición dentro de un grupo de consumidores. Esta responsaibilidad significa que por cada grupo de consumidores, cada lector de la partición debe realizar un seguimiento de su posición actual en el flujo del evento y puede informar al servicio cuando considere que el flujo de datos se ha completado. Si se desconecta un lector de una partición, cuando se vuelve a conectar comienza a leer en el punto de comprobación que envió previamente el último lector de esa partición en ese grupo de consumidores. Cuando se conecta el lector, pasa este desplazamiento al centro de eventos para especificar la ubicación en la que se va a empezar a leer. De este modo, puede usar puntos de comprobación para marcar eventos como "completados" por las aplicaciones de bajada y para ofrecer resistencia en caso de que se produzca una conmutación por error entre lectores que se ejecutan en máquinas distintas. Es posible volver a los datos más antiguos especificando un desplazamiento inferior desde este proceso de puntos de comprobación. Mediante este mecanismo, los puntos de comprobación permiten una resistencia a la conmutación por error y una reproducción del flujo de eventos.

Desplazamientos de números de & secuencia

Ambos números de secuencia de desplazamiento & hacen referencia a la posición de un evento dentro de una partición. Puede considerarlos como un cursor del lado cliente. El desplazamiento es una numeración de byte del evento. El número de desplazamiento o secuencia permite a un consumidor de eventos (lector) especificar un punto en el flujo de eventos desde el que desean empezar a leer eventos. Puede especificar la marca de tiempo de modo que reciba eventos que se poneron en cola solo después de la marca de tiempo especificada. Los consumidores son responsables de almacenar sus propios valores de desplazamiento fuera del servicio Event Hubs. Dentro de una partición, cada evento incluye un desplazamiento, un número de secuencia y la marca de tiempo de cuando se puso en cola.

Ejemplos

Creación de una instancia del contenedor de Storage con el token de SAS

BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
    .connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
    .containerName("<CONTAINER_NAME>")
    .sasToken("<SAS_TOKEN>")
    .buildAsyncClient();

Consumo de eventos mediante un cliente de procesador de eventos

Para consumir eventos para todas las particiones de un centro de eventos, creará un EventProcessorClient para un grupo de consumidores específico. Cuando se crea un centro de eventos, proporciona un grupo de consumidores predeterminado que se puede usar para empezar.

EventProcessorClient delegará el procesamiento de eventos en una función de devolución de llamada que proporcione, lo que le permite centrarse en la lógica necesaria para proporcionar valor, mientras que el procesador es responsable de administrar las operaciones de consumidor subyacentes.

En nuestro ejemplo, nos centraremos en compilar EventProcessor, usaremos y BlobCheckpointStoreuna función de devolución de llamada simple para procesar los eventos recibidos de Event Hubs, escribiremos en la consola y actualizaremos el punto de control en Blob Storage después de cada evento.

BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
    .connectionString("<STORAGE_ACCOUNT_CONNECTION_STRING>")
    .containerName("<CONTAINER_NAME>")
    .sasToken("<SAS_TOKEN>")
    .buildAsyncClient();

EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
    .consumerGroup("<< CONSUMER GROUP NAME >>")
    .connectionString("<< EVENT HUB CONNECTION STRING >>")
    .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
    .processEvent(eventContext -> {
        System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
            + "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
    })
    .processError(errorContext -> {
        System.out.println("Error occurred while processing events " + errorContext.getThrowable().getMessage());
    })
    .buildEventProcessorClient();

// This will start the processor. It will start processing events from all partitions.
eventProcessorClient.start();

// (for demo purposes only - adding sleep to wait for receiving events)
TimeUnit.SECONDS.sleep(2);

// When the user wishes to stop processing events, they can call `stop()`.
eventProcessorClient.stop();

Solución de problemas

Habilitación del registro de cliente

El SDK de Azure para Java ofrece un artículo de registro coherente para ayudar a solucionar errores de aplicación y acelerar su resolución. Los registros generados capturarán el flujo de una aplicación antes de alcanzar el estado terminal para ayudar a encontrar el problema raíz. Consulte la wiki de registro para obtener instrucciones sobre cómo habilitar el registro.

Biblioteca SSL predeterminada

De forma predeterminada, todas las bibliotecas cliente usan la biblioteca Boring SSL nativa de Tomcat para habilitar el rendimiento de nivel nativo para las operaciones SSL. La biblioteca Boring SSL es un archivo uber-jar que contiene bibliotecas nativas para Linux, macOS o Windows, que proporciona un mejor rendimiento en comparación con la implementación SSL predeterminada del JDK. Para obtener más información, incluido cómo reducir el tamaño de las dependencias, consulte la sección optimización del rendimiento de la wiki.

Pasos siguientes

Para empezar, explore los ejemplos aquí.

Contribuciones

Si desea convertirse en colaborador activo de este proyecto, consulte nuestras Directrices de contribución para obtener más información.

Impresiones