Udostępnij za pośrednictwem


Transakcje na platformie Apache Kafka dla usługi Azure Event Hubs

Ten artykuł zawiera szczegółowe informacje na temat używania transakcyjnego interfejsu API platformy Apache Kafka z usługą Azure Event Hubs.

Omówienie

Usługa Event Hubs udostępnia punkt końcowy platformy Kafka, który może być używany przez istniejące aplikacje klienckie platformy Kafka jako alternatywę dla uruchamiania własnego klastra platformy Kafka. Usługa Event Hubs współpracuje z wieloma istniejącymi aplikacjami platformy Kafka. Aby uzyskać więcej informacji, zobacz Event Hubs for Apache Kafka (Usługa Event Hubs dla platformy Apache Kafka).

Ten dokument koncentruje się na tym, jak bezproblemowo korzystać z transakcyjnego interfejsu API platformy Kafka z usługą Azure Event Hubs.

Uwaga

Transakcje platformy Kafka są obecnie dostępne w publicznej wersji zapoznawczej w warstwie Premium i w warstwie Dedykowane.

Transakcje na platformie Apache Kafka

W środowiskach natywnych dla chmury aplikacje muszą być odporne na zakłócenia sieci i ponowne uruchamianie przestrzeni nazw i uaktualnienia. Aplikacje wymagające rygorystycznych gwarancji przetwarzania muszą korzystać z platformy transakcyjnej lub interfejsu API, aby upewnić się, że wszystkie operacje są wykonywane, lub nie są tak, aby stan aplikacji i danych był niezawodnie zarządzany. Jeśli zestaw operacji nie powiedzie się, można je niezawodnie próbować ponownie niepodziealnie, aby zapewnić odpowiednie gwarancje przetwarzania.

Uwaga

Gwarancje transakcyjne są zwykle wymagane, gdy istnieje wiele operacji, które należy przetworzyć w sposób "wszystkie lub nic".

W przypadku wszystkich innych operacji aplikacje klienckie są domyślnie odporne, aby ponowić próbę wykonania operacji z wycofywaniem wykładniczym, jeśli określona operacja nie powiodła się.

Platforma Apache Kafka udostępnia transakcyjny interfejs API w celu zapewnienia tego poziomu gwarancji przetwarzania w tym samym lub innym zestawie tematów/partycji.

Transakcje mają zastosowanie do poniższych przypadków:

  • Producenci transakcyjni.
  • Dokładnie raz przetwarzania semantyki.

Producenci transakcyjni

Producenci transakcyjni zapewniają, że dane są zapisywane niepodziealnie w wielu partycjach w różnych tematach. Producenci mogą zainicjować transakcję, zapisać w wielu partycjach w tym samym temacie lub w różnych tematach, a następnie zatwierdzić lub przerwać transakcję.

Aby zapewnić, że producent jest transakcyjny, należy ustawić wartość true, enable.idempotence aby upewnić się, że dane są zapisywane dokładnie raz, unikając w ten sposób duplikatów po stronie wysyłania. transaction.id Ponadto należy ustawić opcję unikatowego identyfikowania producenta.

    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<String, String> producer = new KafkaProducer(producerProps);

Po zainicjowaniu producenta poniższe wywołanie gwarantuje, że producent rejestruje się w brokerze jako producent transakcyjny -

    producer.initTransactions();

Producent musi następnie jawnie rozpocząć transakcję, wykonać operacje wysyłania w różnych tematach i partycjach w zwykły sposób, a następnie zatwierdzić transakcję za pomocą poniższego wywołania —

    producer.beginTransaction();
	/*
        Send to multiple topic partitions.
    */
    producer.commitTransaction();

Jeśli transakcja musi zostać przerwana z powodu błędu lub przekroczenia limitu czasu, producent może wywołać metodę abortTransaction() .

	producer.abortTransaction();

Dokładnie raz semantyka

Dokładnie raz semantyka opiera się na producentach transakcyjnych, dodając konsumentów w zakresie transakcyjnym producentów, aby każdy rekord był odczytywany, przetwarzany i zapisywany dokładnie raz.

Najpierw tworzone jest wystąpienie producenta transakcyjnego -


    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<K, V> producer = new KafkaProducer(producerProps);

    producer.initTransactions();

Następnie użytkownik musi być skonfigurowany do odczytu tylko nietransakcyjnych komunikatów lub zatwierdzonych komunikatów transakcyjnych, ustawiając poniższą właściwość —


	consumerProps.put("isolation.level", "read_committed");
	KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);

Po utworzeniu wystąpienia użytkownika może subskrybować temat, z którego muszą być odczytywane rekordy —


    consumer.subscribe(singleton("inputTopic"));

Gdy konsument sonduje rekordy z tematu wejściowego, producent rozpoczyna zakres transakcyjny, w którym rekord jest przetwarzany i zapisywany w temacie wyjściowym. Po zapisaniu rekordów zostanie utworzona zaktualizowana mapa przesunięć dla wszystkich partycji. Następnie producent wysyła tę zaktualizowaną mapę przesunięcia do transakcji przed zatwierdzeniem transakcji.

W każdym wyjątku transakcja zostaje przerwana, a producent ponawia próbę przetwarzania po raz kolejny niepodziealnie.

	while (true) {
		ConsumerRecords records = consumer.poll(Long.Max_VALUE);
		producer.beginTransaction();
        try {
    		for (ConsumerRecord record : records) {
    			/*
                    Process record as appropriate
                */
                // Write to output topic
    	        producer.send(producerRecord(“outputTopic”, record));
    		}
    
            /*
                Generate the offset map to be committed.
            */
            Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
            for (TopicPartition partition : records.partitions()) {
                // Calculate the offset to commit and populate the map.
                offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
            }
            
            // send offsets to transaction and then commit the transaction.
    		producer.sendOffsetsToTransaction(offsetsToCommit, group);
    		producer.commitTransaction();
        } catch (Exception e)
        {
            producer.abortTransaction();
        }
	}

Ostrzeżenie

Jeśli transakcja nie zostanie zatwierdzona lub przerwana przed max.transaction.timeout.ms, transakcja zostanie przerwana automatycznie przez usługę Event Hubs. Wartość domyślna max.transaction.timeout.ms to 15 minut przez usługę Event Hubs, ale producent może zastąpić ją niższą wartością, ustawiając transaction.timeout.ms właściwość we właściwościach konfiguracji producenta.

Przewodnik migracji

Jeśli masz istniejące aplikacje platformy Kafka, których chcesz używać z usługą Azure Event Hubs, zapoznaj się z przewodnikiem migracji platformy Kafka dla usługi Azure Event Hubs, aby szybko uruchomić usługę Azure Event Hubs .

Następne kroki

Aby dowiedzieć się więcej o usłudze Event Hubs i usłudze Event Hubs dla platformy Kafka, zobacz następujące artykuły: