Partage via


Transactions dans Apache Kafka pour Azure Event Hubs

Cet article fournit des détails sur l’utilisation de l’API transactionnelle Apache Kafka avec Azure Event Hubs.

Vue d’ensemble

Event Hubs fournit un point de terminaison Kafka qui peut être utilisé par vos applications clientes Kafka pour éviter d’exécuter votre propre cluster Kafka. Event Hubs fonctionne avec un grand nombre de vos applications Kafka existantes. Pour plus d’informations, consultez Event Hubs pour Apache Kafka.

Ce document se concentre sur l’utilisation de l’API transactionnelle Kafka avec Azure Event Hubs en toute transparence.

Remarque

Kafka Transactions est actuellement en Préversion publique dans les niveaux Premium et Dédié.

Transactions dans Apache Kafka

Dans les environnements natifs cloud, les applications doivent être résilientes aux interruptions réseau et aux redémarrages et mises à niveau de l’espace de noms. Les applications nécessitant des garanties de traitement strictes doivent utiliser un cadre transactionnel ou une API pour s’assurer que toutes les opérations sont exécutées, ou qu’aucune ne l’est afin que l’état de l’application et des données soit géré de manière fiable. Si l’ensemble d’opérations échoue, il peut être réessayé de manière atomique de manière fiable pour assurer les garanties de traitement appropriées.

Remarque

Les garanties transactionnelles sont généralement requises lorsqu’il existe plusieurs opérations qui doivent être traitées de manière « tout ou rien ».

Pour toutes les autres opérations, les applications clientes sont résilientes par défaut pour réessayer l’opération avec une interruption exponentielle, si l’opération spécifique a échoué.

Apache Kafka fournit une API transactionnelle pour assurer ce niveau de garanties de traitement sur le même ensemble de rubriques/partitions identiques ou un ensemble différent.

Les transactions s’appliquent aux cas ci-dessous :

  • Producteurs transactionnels.
  • Sémantique de traitement Une fois exactement.

Producteurs transactionnels

Les producteurs transactionnels garantissent que les données sont écrites atomiquement dans plusieurs partitions dans différentes rubriques. Les producteurs peuvent lancer une transaction, écrire dans plusieurs partitions sur la même rubrique ou dans différentes rubriques, puis valider ou abandonner la transaction.

Pour garantir qu’un producteur est transactionnel, enable.idempotence doit être défini sur true pour s’assurer que les données sont écrites exactement une seule fois, ce qui évite les doublons côté envoi. En outre, transaction.id doit être défini pour identifier de manière unique le producteur.

    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<String, String> producer = new KafkaProducer(producerProps);

Une fois le producteur initialisé, l’appel ci-dessous garantit que le producteur s’inscrit auprès du courtier en tant que producteur transactionnel :

    producer.initTransactions();

Le producteur doit ensuite commencer explicitement une transaction, effectuer des opérations d’envoi sur différentes rubriques et partitions comme à la normale, puis valider la transaction avec l’appel ci-dessous :

    producer.beginTransaction();
	/*
        Send to multiple topic partitions.
    */
    producer.commitTransaction();

Si la transaction doit être abandonnée en raison d’une erreur ou d’un délai d’expiration, le producteur peut appeler la méthode abortTransaction().

	producer.abortTransaction();

Sémantique Une fois exactement

La sémantique Une fois exactement s’appuie sur les producteurs transactionnels en ajoutant des consommateurs dans l’étendue transactionnelle des producteurs, afin de garantir que chaque enregistrement est lu, traité et écrit une fois exactement.

Tout d’abord, le producteur transactionnel est instancié :


    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<K, V> producer = new KafkaProducer(producerProps);

    producer.initTransactions();

Ensuite, le consommateur doit être configuré pour lire uniquement les messages non transactionnels ou les messages transactionnels validés en définissant la propriété ci-dessous :


	consumerProps.put("isolation.level", "read_committed");
	KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);

Une fois que le consommateur est instancié, il peut s’abonner à la rubrique à partir de laquelle les enregistrements doivent être lus :


    consumer.subscribe(singleton("inputTopic"));

Une fois que le consommateur interroge les enregistrements de la rubrique d’entrée, le producteur commence l’étendue transactionnelle dans laquelle l’enregistrement est traité et écrit dans la rubrique de sortie. Une fois les enregistrements écrits, la carte mise à jour des décalages pour toutes les partitions est créée. Le producteur envoie ensuite ce mappage de décalage mis à jour à la transaction avant de valider la transaction.

Dans toute exception, la transaction est abandonnée et le producteur retente le traitement de façon atomique.

	while (true) {
		ConsumerRecords records = consumer.poll(Long.Max_VALUE);
		producer.beginTransaction();
        try {
    		for (ConsumerRecord record : records) {
    			/*
                    Process record as appropriate
                */
                // Write to output topic
    	        producer.send(producerRecord(“outputTopic”, record));
    		}
    
            /*
                Generate the offset map to be committed.
            */
            Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
            for (TopicPartition partition : records.partitions()) {
                // Calculate the offset to commit and populate the map.
                offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
            }
            
            // send offsets to transaction and then commit the transaction.
    		producer.sendOffsetsToTransaction(offsetsToCommit, group);
    		producer.commitTransaction();
        } catch (Exception e)
        {
            producer.abortTransaction();
        }
	}

Avertissement

Si la transaction n’est ni validée ni abandonnée avant le max.transaction.timeout.ms, la transaction est abandonnée automatiquement par Event Hubs. La valeur max.transaction.timeout.ms par défaut est définie sur 15 minutes par Event Hubs, mais le producteur peut le remplacer par une valeur inférieure en définissant la propriété transaction.timeout.ms dans les propriétés de configuration du producteur.

Guide de migration

Si vous avez des applications Kafka existantes que vous souhaitez utiliser avec Azure Event Hubs, veuillez consulter le Guide de migration Kafka pour Azure Event Hubs afin de démarrer rapidement.

Étapes suivantes

Pour plus d’informations sur Event Hubs et sur Event Hubs pour Kafka, consultez les articles suivants :