Azure Event Hubs biblioteca de almacén de puntos de control para Javascript mediante blobs de almacenamiento
Una solución basada en Azure Blob Storage para almacenar puntos de control y ayudar en el equilibrio de carga al usar EventHubConsumerClient
desde la biblioteca @azure/event-hubs
Código | fuentePaquete (npm) | Documentación | de referencia de APIMuestras
Introducción
Instalar el paquete
Instalación de la biblioteca de blobs del almacén de puntos de control de Azure Event Hubs mediante npm
npm install @azure/eventhubs-checkpointstore-blob
Requisitos previos: debe tener una suscripción de Azure, un espacio de nombres de Event Hubs para usar este paquete y una cuenta de almacenamiento.
Si usa este paquete en una aplicación de Node.js, use Node.js 8.x o superior.
Configurar Typescript
Los usuarios de TypeScript deben tener instaladas definiciones de tipo de nodo:
npm install @types/node
También debe habilitar compilerOptions.allowSyntheticDefaultImports
en tsconfig.json. Tenga en cuenta que si ha habilitado , allowSyntheticDefaultImports
está habilitado compilerOptions.esModuleInterop
de forma predeterminada. Consulte el manual de opciones del compilador de TypeScript para obtener más información.
Conceptos clave
Escalado: cree varios consumidores y que cada consumidor tome posesión de la lectura desde varias particiones de Event Hubs.
Equilibrio de carga: Las aplicaciones que admiten el equilibrio de carga constan de una o varias instancias de
EventHubConsumerClient
las cuales se han configurado para consumir eventos del mismo centro de eventos y del mismo grupo de consumidores y del mismoCheckpointStore
. Equilibran la carga de trabajo entre distintas instancias mediante la distribución de las particiones que se van a procesar entre sí.Puntos de control: Es un proceso por el que los lectores marcan o confirman su posición dentro de una secuencia de eventos de partición. La creación de puntos de comprobación es responsabilidad del consumidor y se realiza por partición dentro de un grupo de consumidores. Esta responsaibilidad significa que por cada grupo de consumidores, cada lector de la partición debe realizar un seguimiento de su posición actual en el flujo del evento y puede informar al servicio cuando considere que el flujo de datos se ha completado.
Si se desconecta un lector de una partición, cuando se vuelve a conectar comienza a leer en el punto de comprobación que envió previamente el último lector de esa partición en ese grupo de consumidores. Cuando se conecta el lector, pasa este desplazamiento al centro de eventos para especificar la ubicación en la que se va a empezar a leer. De este modo, puede usar puntos de comprobación para marcar eventos como "completados" por las aplicaciones de bajada y para ofrecer resistencia en caso de que se produzca una conmutación por error entre lectores que se ejecutan en máquinas distintas. Es posible volver a los datos más antiguos especificando un desplazamiento inferior desde este proceso de puntos de comprobación. Mediante este mecanismo, los puntos de comprobación permiten una resistencia a la conmutación por error y una reproducción del flujo de eventos.
BlobCheckpointStore es una clase que implementa métodos clave requeridos por EventHubConsumerClient para equilibrar los puntos de control de carga y actualización.
Ejemplos
- Creación de una instancia de CheckpointStore mediante Azure Blob Storage
- Eventos de punto de comprobación mediante Azure Blob Storage
Creación de un CheckpointStore
objeto mediante Azure Blob Storage
Use el fragmento de código siguiente para crear un CheckpointStore
. Deberá proporcionar el cadena de conexión a la cuenta de almacenamiento.
import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!containerClient.exists()) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
Eventos de punto de comprobación mediante Azure Blob Storage
Para controlar los eventos recibidos mediante Azure Blob Storage, deberá pasar un objeto compatible con la interfaz SubscriptionEventHandlers junto con el código para llamar al updateCheckpoint()
método .
En este ejemplo, SubscriptionHandlers
implementa SubscriptionEventHandlers y también controla los puntos de comprobación.
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";
const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!(await containerClient.exists())) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
class SubscriptionHandlers {
async processEvents(event, context) {
// custom logic for processing events goes here
// Checkpointing will allow your service to restart and pick
// up from where it left off.
//
// You'll want to balance how often you checkpoint with the
// performance of your underlying checkpoint store.
await context.updateCheckpoint(event);
}
async processError(err, context) {
// handle any errors that occur during the course of
// this subscription
console.log(`Errors in subscription: ${err}`);
}
}
const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);
const subscription = consumerClient.subscribe(new SubscriptionHandlers());
// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();
Solución de problemas
Habilitamiento de registros
Puede establecer la AZURE_LOG_LEVEL
variable de entorno en uno de los valores siguientes para habilitar el registro en stderr
:
- verbose
- info
- warning
- error
También puede establecer el nivel de registro mediante programación importando el paquete @azure/registrador y llamando a la setLogLevel
función con uno de los valores de nivel de registro.
Al establecer un nivel de registro mediante programación o a través de la AZURE_LOG_LEVEL
variable de entorno, se emitirán todos los registros escritos mediante un nivel de registro igual o menor que el que elija.
Por ejemplo, al establecer el nivel info
de registro en , los registros que se escriben para los niveles warning
y error
también se emiten.
Este SDK sigue las directrices del SDK de Azure para TypeScript al determinar en qué nivel se debe iniciar sesión.
También puede establecer la DEBUG
variable de entorno para obtener registros al usar esta biblioteca.
Esto puede ser útil si también desea emitir registros de las dependencias rhea-promise
y rhea
también.
Nota: AZURE_LOG_LEVEL, si se establece, tiene prioridad sobre DEBUG.
No especifique ninguna azure
biblioteca a través de DEBUG cuando también especifique AZURE_LOG_LEVEL o llame a setLogLevel.
Puede establecer la siguiente variable de entorno para obtener los registros de depuración cuando se usa esta biblioteca.
- Obtener solo los registros de depuración de nivel de información del blob De almacén de puntos de control de Eventhubs.
export DEBUG=azure:eventhubs-checkpointstore-blob:info
Registro en un archivo
Habilite el registro como se muestra anteriormente y, a continuación, ejecute el script de prueba como se indica a continuación:
Las instrucciones de registro del script de prueba van a
out.log
y las instrucciones de registro del sdk van adebug.log
.node your-test-script.js > out.log 2>debug.log
Las instrucciones de registro del script de prueba y el SDK van al mismo archivo
out.log
redirigiendo stderr a stdout (&1) y, a continuación, redirigen stdout a un archivo:node your-test-script.js >out.log 2>&1
Las instrucciones de registro del script de prueba y el SDK van al mismo archivo
out.log
.node your-test-script.js &> out.log
Pasos siguientes
Eche un vistazo al directorio de ejemplos para obtener un ejemplo detallado.
Contribuciones
Si desea contribuir a esta biblioteca, lea la guía de contribución para obtener más información sobre cómo compilar y probar el código.
Azure SDK for JavaScript