Dela via


Transaktioner i Apache Kafka för Azure Event Hubs

Den här artikeln innehåller information om hur du använder Apache Kafka-transaktions-API:et med Azure Event Hubs.

Översikt

Event Hubs tillhandahåller en Kafka-slutpunkt som kan användas av dina befintliga Kafka-klientprogram som ett alternativ till att köra ditt eget Kafka-kluster. Event Hubs fungerar med många av dina befintliga Kafka-program. Mer information finns i Event Hubs för Apache Kafka.

Det här dokumentet fokuserar på hur du använder Kafkas transaktions-API med Azure Event Hubs sömlöst.

Kommentar

Kafka-transaktioner finns för närvarande i offentlig förhandsversion på premium- och dedikerad nivå.

Transaktioner i Apache Kafka

I molnbaserade miljöer måste program göras motståndskraftiga mot nätverksstörningar och omstarter och uppgraderingar av namnområdet. Program som kräver strikta bearbetningsgarantier måste använda ett transaktionsramverk eller API för att säkerställa att alla åtgärder körs, eller så är inget så att programmet och datatillståndet hanteras på ett tillförlitligt sätt. Om uppsättningen med åtgärder misslyckas kan de provas igen atomiskt för att säkerställa rätt bearbetningsgarantier.

Kommentar

Transaktionsgarantier krävs vanligtvis när det finns flera åtgärder som behöver bearbetas på ett "allt eller inget"-sätt.

För alla andra åtgärder är klientprogram motståndskraftiga som standard för att försöka utföra åtgärden igen med en exponentiell backoff, om den specifika åtgärden misslyckades.

Apache Kafka tillhandahåller ett transaktions-API för att säkerställa den här nivån av bearbetningsgarantier i samma eller olika uppsättningar av ämnen/partitioner.

Transaktioner gäller för nedanstående fall:

  • Transaktionsproducenter.
  • Exakt en gång bearbetas semantik.

Transaktionsproducenter

Transaktionsproducenter ser till att data skrivs atomiskt till flera partitioner i olika ämnen. Producenter kan initiera en transaktion, skriva till flera partitioner i samma ämne eller i olika ämnen och sedan checka in eller avbryta transaktionen.

För att säkerställa att en producent är transaktionell enable.idempotence bör det anges till sant för att säkerställa att data skrivs exakt en gång, vilket undviker dubbletter på sändningssidan. Dessutom transaction.id bör ställas in för att unikt identifiera producenten.

    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<String, String> producer = new KafkaProducer(producerProps);

När producenten har initierats ser nedanstående anrop till att producenten registrerar sig hos mäklaren som transaktionsproducent -

    producer.initTransactions();

Producenten måste sedan påbörja en transaktion explicit, utföra sändningsåtgärder i olika ämnen och partitioner som vanligt och sedan checka in transaktionen med anropet nedan –

    producer.beginTransaction();
	/*
        Send to multiple topic partitions.
    */
    producer.commitTransaction();

Om transaktionen måste avbrytas på grund av ett fel eller en tidsgräns kan producenten anropa abortTransaction() metoden.

	producer.abortTransaction();

Exakt en gång semantik

Exakt när semantiken bygger på transaktionsproducenterna genom att lägga till konsumenter i producenternas transaktionsomfång, så att varje post garanteras att läsas, bearbetas och skrivas exakt en gång.

Först instansieras transaktionsproducenten -


    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<K, V> producer = new KafkaProducer(producerProps);

    producer.initTransactions();

Sedan måste konsumenten konfigureras för att endast läsa icke-transaktionella meddelanden eller checka in transaktionsmeddelanden genom att ange egenskapen nedan –


	consumerProps.put("isolation.level", "read_committed");
	KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);

När konsumenten har instansierats kan den prenumerera på ämnet där posterna måste läsas –


    consumer.subscribe(singleton("inputTopic"));

När konsumenten har avsökt posterna från indataavsnittet börjar producenten det transaktionsomfång inom vilket posten bearbetas och skrivs till utdataavsnittet. När posterna har skrivits skapas den uppdaterade kartan över förskjutningar för alla partitioner. Producenten skickar sedan den uppdaterade förskjutningskartan till transaktionen innan transaktionen genomförs.

I alla undantag avbryts transaktionen och producenten försöker bearbeta igen atomiskt.

	while (true) {
		ConsumerRecords records = consumer.poll(Long.Max_VALUE);
		producer.beginTransaction();
        try {
    		for (ConsumerRecord record : records) {
    			/*
                    Process record as appropriate
                */
                // Write to output topic
    	        producer.send(producerRecord(“outputTopic”, record));
    		}
    
            /*
                Generate the offset map to be committed.
            */
            Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
            for (TopicPartition partition : records.partitions()) {
                // Calculate the offset to commit and populate the map.
                offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
            }
            
            // send offsets to transaction and then commit the transaction.
    		producer.sendOffsetsToTransaction(offsetsToCommit, group);
    		producer.commitTransaction();
        } catch (Exception e)
        {
            producer.abortTransaction();
        }
	}

Varning

Om transaktionen varken har checkats in eller avbrutits före max.transaction.timeout.msavbryts transaktionen automatiskt av Event Hubs. Standardvärdet max.transaction.timeout.ms anges till 15 minuter av Event Hubs, men producenten kan åsidosätta det till ett lägre värde genom att ange transaction.timeout.ms egenskapen i producentkonfigurationsegenskaperna.

Migreringsguide

Om du har befintliga Kafka-program som du vill använda med Azure Event Hubs kan du läsa Kafka-migreringsguiden för Azure Event Hubs för att snabbt komma igång.

Nästa steg

Mer information om Event Hubs och Event Hubs för Kafka finns i följande artiklar: