Compartir vía


Transacciones en Apache Kafka para Azure Event Hubs

En este artículo se detalla cómo usar la API transaccional de Apache Kafka con Azure Event Hubs.

Información general

Event Hubs proporciona un punto de conexión de Kafka que las aplicaciones cliente de Kafka existentes pueden usar como alternativa a la ejecución de su propio clúster de Kafka. Event Hubs funciona con muchas de sus aplicaciones de Kafka actuales. Para obtener más información, consulte Event Hubs para Apache Kafka.

Este documento se centra en cómo usar la API transaccional de Kafka con Azure Event Hubs sin problemas.

Nota:

Las transacciones de Kafka se encuentran actualmente en versión preliminar pública en el nivel Premium y Dedicado.

Transacciones en Apache Kafka

En entornos nativos en la nube, las aplicaciones deben ser resistentes a las interrupciones de red y a los reinicios y actualizaciones del espacio de nombres. Las aplicaciones que requieren garantías estrictas de procesamiento deben usar un marco transaccional o una API para garantizar que se ejecuten todas las operaciones o que no se ejecute ninguna, de modo que el estado de la aplicación y de los datos se administre de forma fiable. Si el conjunto de operaciones falla, se pueden volver a probar atómicamente de forma fiable para garantizar el procesamiento correcto.

Nota:

Las garantías transaccionales suelen ser necesarias cuando hay varias operaciones que deben procesarse de forma "todo o nada".

Para todas las demás operaciones, las aplicaciones cliente son resistentes de manera predeterminada para reintentar la operación con un retroceso exponencial, si la operación específica falló.

Apache Kafka proporciona una API transaccional para garantizar este nivel de procesamiento en el mismo conjunto o en conjuntos diferentes de temas o particiones.

Las transacciones se aplican a los casos siguientes:

  • Productores transaccionales.
  • Semántica de procesamiento exactamente una vez.

Productores transaccionales

Los productores transaccionales garantizan que los datos se escriban atómicamente en varias particiones de diferentes temas. Los productores pueden iniciar una transacción, escribir en varias particiones en el mismo tema o en distintos temas y, a continuación, confirmar o anular la transacción.

Para asegurarse de que un productor es transaccional, enable.idempotence debería establecerse en verdadero para garantizar que los datos se escriben exactamente una vez, evitando así duplicados en el lado de envío. Además, transaction.id debería establecerse para identificar unívocamente al productor.

    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<String, String> producer = new KafkaProducer(producerProps);

Una vez inicializado el productor, la llamada siguiente garantiza que el productor se registre en el agente como productor transaccional:

    producer.initTransactions();

El productor debe entonces iniciar una transacción explícitamente, realizar operaciones de envío a través de diferentes temas y particiones de forma normal, y después confirmar la transacción con la siguiente llamada:

    producer.beginTransaction();
	/*
        Send to multiple topic partitions.
    */
    producer.commitTransaction();

Si es necesario abortar la transacción debido a un error o a que se ha agotado el tiempo de espera, el productor puede llamar al método abortTransaction().

	producer.abortTransaction();

Semántica de exactamente una vez

La semántica de exactamente una vez se basa en los productores transaccionales añadiendo consumidores en el ámbito transaccional de los productores, de modo que se garantiza que cada registro se lee, procesa y escribe exactamente una vez.

Primero se crea una instancia del productor transaccional:


    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<K, V> producer = new KafkaProducer(producerProps);

    producer.initTransactions();

Después, el consumidor debe configurarse para leer solo mensajes no transaccionales, o mensajes transaccionales confirmados estableciendo la siguiente propiedad:


	consumerProps.put("isolation.level", "read_committed");
	KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);

Una vez creada la instancia del consumidor, puede suscribirse al tema desde el que deben leerse los registros:


    consumer.subscribe(singleton("inputTopic"));

Cuando el consumidor sondea los registros del tema de entrada, el productor inicia el ámbito transaccional dentro del cual se procesa el registro y se escribe en el tema de salida. Una vez escritos los registros, se crea la asignación de desplazamientos actualizada para todas las particiones. A continuación, el productor envía esta asignación de desplazamientos actualizada a la transacción antes de confirmar la transacción.

En cualquier excepción, la transacción se aborta y el productor reintenta el procesamiento una vez más de forma atómica.

	while (true) {
		ConsumerRecords records = consumer.poll(Long.Max_VALUE);
		producer.beginTransaction();
        try {
    		for (ConsumerRecord record : records) {
    			/*
                    Process record as appropriate
                */
                // Write to output topic
    	        producer.send(producerRecord(“outputTopic”, record));
    		}
    
            /*
                Generate the offset map to be committed.
            */
            Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
            for (TopicPartition partition : records.partitions()) {
                // Calculate the offset to commit and populate the map.
                offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
            }
            
            // send offsets to transaction and then commit the transaction.
    		producer.sendOffsetsToTransaction(offsetsToCommit, group);
    		producer.commitTransaction();
        } catch (Exception e)
        {
            producer.abortTransaction();
        }
	}

Advertencia

Si la transacción no se confirma ni anula antes de max.transaction.timeout.ms, Event Hubs anulará automáticamente la transacción. De manera predeterminada, max.transaction.timeout.ms está establecido en 15 minutos por Event Hubs, pero el productor puede invalidarlo a un valor más bajo estableciendo la propiedad transaction.timeout.ms en las propiedades de configuración del productor.

Guía de migración

Si tiene aplicaciones de Kafka que quiera usar con Azure Event Hubs, consulte la Guía de migración de Kafka para Azure Event Hubs para empezar a trabajar rápidamente.

Pasos siguientes

Para obtener más información acerca de Event Hubs y Event Hubs para Kafka, consulte los artículos siguientes: