Delen via


Transacties in Apache Kafka voor Azure Event Hubs

In dit artikel vindt u meer informatie over het gebruik van de transactionele API van Apache Kafka met Azure Event Hubs.

Overzicht

Event Hubs biedt een Kafka-eindpunt dat kan worden gebruikt door uw bestaande Kafka-clienttoepassingen als alternatief voor het uitvoeren van uw eigen Kafka-cluster. Event Hubs werkt met veel van uw bestaande Kafka-toepassingen. Zie Event Hubs voor Apache Kafka voor meer informatie.

Dit document is gericht op het naadloos gebruik van de transactionele API van Kafka met Azure Event Hubs.

Notitie

Kafka Transactions is momenteel beschikbaar als openbare preview-versie in Premium en dedicated.

Transacties in Apache Kafka

In systeemeigen cloudomgevingen moeten toepassingen tolerant worden gemaakt voor netwerkonderbrekingen en opnieuw opstarten van naamruimten en upgrades. Toepassingen waarvoor strikte verwerkingsgaranties zijn vereist, moeten gebruikmaken van een transactioneel framework of API om ervoor te zorgen dat alle bewerkingen worden uitgevoerd, of dat er geen zijn zodat de toepassing en de status van de gegevens betrouwbaar worden beheerd. Als de set bewerkingen mislukt, kunnen ze op betrouwbare wijze opnieuw atomisch worden geprobeerd om ervoor te zorgen dat de juiste verwerkingsgaranties worden gegarandeerd.

Notitie

Transactionele garanties zijn doorgaans vereist wanneer er meerdere bewerkingen moeten worden verwerkt op een 'alles of niets'-manier.

Voor alle andere bewerkingen zijn clienttoepassingen standaard tolerant om de bewerking opnieuw uit te voeren met een exponentieel uitstel als de specifieke bewerking is mislukt.

Apache Kafka biedt een transactionele API om ervoor te zorgen dat dit verwerkingsniveau garanties biedt voor dezelfde of verschillende set onderwerp/partities.

Transacties zijn van toepassing op de onderstaande gevallen:

  • Transactionele producenten.
  • Precies één keer semantiek verwerken.

Transactionele producenten

Transactionele producenten zorgen ervoor dat gegevens atomisch naar meerdere partities in verschillende onderwerpen worden geschreven. Producenten kunnen een transactie initiëren, schrijven naar meerdere partities op hetzelfde onderwerp of in verschillende onderwerpen en vervolgens de transactie doorvoeren of afbreken.

Om ervoor te zorgen dat een producent transactioneel is, enable.idempotence moet deze worden ingesteld op waar om ervoor te zorgen dat de gegevens precies eenmaal worden geschreven, waardoor dubbele waarden aan de verzendzijde worden vermeden. Bovendien transaction.id moet worden ingesteld om de producent uniek te identificeren.

    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<String, String> producer = new KafkaProducer(producerProps);

Zodra de producent is geïnitialiseerd, zorgt de onderstaande aanroep ervoor dat de producent zich registreert bij de broker als transactionele producent -

    producer.initTransactions();

De producent moet vervolgens expliciet een transactie starten, verzendbewerkingen uitvoeren op verschillende onderwerpen en partities als normaal, en vervolgens de transactie doorvoeren met de onderstaande aanroep –

    producer.beginTransaction();
	/*
        Send to multiple topic partitions.
    */
    producer.commitTransaction();

Als de transactie moet worden afgebroken vanwege een fout of een time-out, kan de producent de abortTransaction() methode aanroepen.

	producer.abortTransaction();

Precies eenmaal semantiek

Precies zodra semantiek voortbouwt op de transactionele producenten door consumenten toe te voegen aan het transactionele bereik van de producenten, zodat elke record gegarandeerd exact één keer wordt gelezen, verwerkt en geschreven.

Eerst wordt de transactionele producent geïnstantieerd -


    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<K, V> producer = new KafkaProducer(producerProps);

    producer.initTransactions();

Vervolgens moet de consument zodanig worden geconfigureerd dat alleen niet-transactionele berichten worden gelezen of transactionele berichten worden doorgevoerd door de onderstaande eigenschap in te stellen:


	consumerProps.put("isolation.level", "read_committed");
	KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);

Zodra de consument is geïnstantieerd, kan deze zich abonneren op het onderwerp waar de records moeten worden gelezen –


    consumer.subscribe(singleton("inputTopic"));

Nadat de consument de records uit het invoeronderwerp heeft gepeild, begint de producent met het transactionele bereik waarin de record wordt verwerkt en naar het uitvoeronderwerp wordt geschreven. Zodra de records zijn geschreven, wordt de bijgewerkte toewijzing van offsets voor alle partities gemaakt. De producent verzendt deze bijgewerkte offsettoewijzing vervolgens naar de transactie voordat de transactie wordt doorgevoerd.

In elke uitzondering wordt de transactie afgebroken en probeert de producent de verwerking opnieuw atomisch te verwerken.

	while (true) {
		ConsumerRecords records = consumer.poll(Long.Max_VALUE);
		producer.beginTransaction();
        try {
    		for (ConsumerRecord record : records) {
    			/*
                    Process record as appropriate
                */
                // Write to output topic
    	        producer.send(producerRecord(“outputTopic”, record));
    		}
    
            /*
                Generate the offset map to be committed.
            */
            Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
            for (TopicPartition partition : records.partitions()) {
                // Calculate the offset to commit and populate the map.
                offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
            }
            
            // send offsets to transaction and then commit the transaction.
    		producer.sendOffsetsToTransaction(offsetsToCommit, group);
    		producer.commitTransaction();
        } catch (Exception e)
        {
            producer.abortTransaction();
        }
	}

Waarschuwing

Als de transactie niet vóór de max.transaction.timeout.mstransactie is doorgevoerd of afgebroken, wordt de transactie automatisch afgebroken door Event Hubs. De standaardwaarde max.transaction.timeout.ms is ingesteld op 15 minuten door Event Hubs, maar de producent kan deze overschrijven naar een lagere waarde door de eigenschap in de configuratie-eigenschappen van de transaction.timeout.ms producent in te stellen.

Migratiehandleiding

Als u bestaande Kafka-toepassingen hebt die u wilt gebruiken met Azure Event Hubs, raadpleegt u de Kafka-migratiehandleiding voor Azure Event Hubs om snel aan de slag te gaan.

Volgende stappen

Zie de volgende artikelen voor meer informatie over Event Hubs en Event Hubs voor Kafka: