Transakce v Apache Kafka pro Azure Event Hubs
Tento článek obsahuje podrobné informace o tom, jak používat transakční rozhraní API Apache Kafka se službou Azure Event Hubs.
Přehled
Event Hubs poskytuje koncový bod Kafka, který můžou vaše stávající klientské aplikace Kafka používat jako alternativu ke spuštění vlastního clusteru Kafka. Event Hubs funguje s mnoha vašimi existujícími aplikacemi Kafka. Další informace najdete v tématu Event Hubs pro Apache Kafka.
Tento dokument se zaměřuje na to, jak bez problémů používat transakční rozhraní API Kafka se službou Azure Event Hubs.
Poznámka:
Transakce Kafka jsou aktuálně ve verzi Public Preview ve verzi Premium a na úrovni Dedicated.
Transakce v Apache Kafka
V nativních cloudových prostředích musí být aplikace odolné vůči přerušení sítě a restartování a upgradům oboru názvů. Aplikace vyžadující striktní záruky zpracování musí využívat transakční architekturu nebo rozhraní API, aby se zajistilo, že se spustí všechny operace, nebo žádné z nich tak, aby byla aplikace a stav dat spolehlivě spravovány. Pokud sada operací selže, je možné je spolehlivě zopakovat atomicky, aby se zajistily správné záruky zpracování.
Poznámka:
Transakční záruky se obvykle vyžadují, pokud existuje více operací, které je potřeba zpracovat "vše nebo nic".
U všech ostatních operací jsou klientské aplikace ve výchozím nastavení odolné proti opakování operace s exponenciálním zpomalováním, pokud konkrétní operace selhala.
Apache Kafka poskytuje transakční rozhraní API pro zajištění této úrovně záruk zpracování napříč stejnou nebo jinou sadou témat a oddílů.
Transakce platí pro následující případy:
- Transakční producenti.
- Přesně jednou zpracování sémantiky.
Transakční producenti
Transakční producenti zajišťují, aby data byla zapisována atomicky do více oddílů v různých tématech. Producenti mohou zahájit transakci, zapisovat do více oddílů ve stejném tématu nebo v různých tématech a pak transakce potvrdit nebo přerušit.
Pokud chcete zajistit, aby byl producent transakční, měl by být nastaven na hodnotu true, enable.idempotence
aby se zajistilo, že se data zapisují přesně jednou, čímž se vyhnete duplicitám na straně odeslání . Kromě toho transaction.id
by mělo být nastaveno jedinečné identifikaci producenta.
producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "transactional-producer-1");
KafkaProducer<String, String> producer = new KafkaProducer(producerProps);
Jakmile je producent inicializován, následující volání zajistí, že se producent zaregistruje u zprostředkovatele jako transakční producent -
producer.initTransactions();
Producent pak musí zahájit transakci explicitně, provádět operace odesílání napříč různými tématy a oddíly jako obvykle, a pak potvrdit transakci s následujícím voláním –
producer.beginTransaction();
/*
Send to multiple topic partitions.
*/
producer.commitTransaction();
Pokud je nutné transakci přerušit kvůli chybě nebo vypršení časového limitu, může producent volat metodu abortTransaction()
.
producer.abortTransaction();
Přesně jednou sémantika
Přesně jakmile sémantika staví na transakčních producentech přidáním spotřebitelů do transakčního rozsahu producentů, aby každý záznam byl zaručen ke čtení, zpracování a zápisu přesně jednou.
Nejprve se vytvoří instance transakčního producenta -
producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "transactional-producer-1");
KafkaProducer<K, V> producer = new KafkaProducer(producerProps);
producer.initTransactions();
Pak musí být příjemce nakonfigurován tak, aby četl pouze netransakční zprávy nebo potvrzené transakční zprávy nastavením následující vlastnosti –
consumerProps.put("isolation.level", "read_committed");
KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);
Jakmile se vytvoří instance příjemce, může se přihlásit k odběru tématu, ze kterého se musí záznamy číst –
consumer.subscribe(singleton("inputTopic"));
Jakmile příjemce dotazuje záznamy ze vstupního tématu, producent zahájí transakční obor, ve kterém se záznam zpracuje a zapíše do výstupního tématu. Po zápisu záznamů se vytvoří aktualizovaná mapa posunů pro všechny oddíly. Producent pak odešle tuto aktualizovanou odsazení mapování na transakci před potvrzením transakce.
V jakékoli výjimce je transakce přerušena a producent znovu opakuje zpracování znovu atomicky.
while (true) {
ConsumerRecords records = consumer.poll(Long.Max_VALUE);
producer.beginTransaction();
try {
for (ConsumerRecord record : records) {
/*
Process record as appropriate
*/
// Write to output topic
producer.send(producerRecord(“outputTopic”, record));
}
/*
Generate the offset map to be committed.
*/
Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
for (TopicPartition partition : records.partitions()) {
// Calculate the offset to commit and populate the map.
offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
}
// send offsets to transaction and then commit the transaction.
producer.sendOffsetsToTransaction(offsetsToCommit, group);
producer.commitTransaction();
} catch (Exception e)
{
producer.abortTransaction();
}
}
Upozorňující
Pokud transakce není potvrzena nebo přerušena před , max.transaction.timeout.ms
transakce bude přerušena službou Event Hubs automaticky. Služba Event Hubs nastaví výchozí max.transaction.timeout.ms
hodnotu na 15 minut , ale producent ji může přepsat na nižší hodnotu nastavením transaction.timeout.ms
vlastnosti konfigurace producenta.
Průvodce migrací
Pokud máte existující aplikace Kafka, které chcete použít se službou Azure Event Hubs, projděte si průvodce migrací kafka pro Službu Azure Event Hubs a rychle se ho zprovozněte.
Další kroky
Další informace o službě Event Hubs a Event Hubs pro Kafka najdete v následujících článcích: