Транзакции в Apache Kafka для Центры событий Azure
В этой статье содержатся сведения об использовании API транзакций Apache Kafka с Центры событий Azure.
Обзор
Центры событий предоставляют конечную точку Kafka, которая может использоваться существующими клиентскими приложениями Kafka в качестве альтернативы запуску собственного кластера Kafka. Центры событий работают со многими из ваших существующих приложений Kafka. Дополнительные сведения см. в разделе Центры событий для Apache Kafka.
В этом документе основное внимание уделяется использованию транзакционного API Kafka с Центры событий Azure без проблем.
Примечание.
Транзакции Kafka в настоящее время находятся в общедоступной предварительной версии в категории "Премиум" и "Выделенный".
Транзакции в Apache Kafka
В облачных средах приложения должны быть устойчивы к сбоям сети и перезапускам пространства имен и обновлениям. Приложения, требующие строгих гарантий обработки, должны использовать платформу транзакций или API, чтобы обеспечить выполнение всех операций или нет таким образом, чтобы приложение и состояние данных было надежно управляемым. Если набор операций завершается сбоем, они могут быть надежно проверены атомарны, чтобы обеспечить правильные гарантии обработки.
Примечание.
Обычно при наличии нескольких операций, которые должны обрабатываться в режиме "все или ничего", обычно требуются гарантии транзакций.
Для всех других операций клиентские приложения по умолчанию устойчивы к повторению операции с экспоненциальным обратным выходом, если конкретная операция завершилась ошибкой.
Apache Kafka предоставляет транзакционный API, чтобы обеспечить такой уровень гарантий обработки в одном или другом наборе разделов или разделов.
Транзакции применяются к следующим случаям:
- Производители транзакций.
- Точно после обработки семантики.
Производители транзакций
Производители транзакций гарантируют, что данные записываются атомарны в несколько разделов в разных разделах. Производители могут инициировать транзакцию, записывать в несколько секций в одном разделе или в разных разделах, а затем фиксировать или прерывать транзакцию.
Чтобы убедиться, что производитель транзакционен, enable.idempotence
необходимо задать значение true, чтобы убедиться, что данные записываются ровно один раз, избегая дублирования на стороне отправки . Кроме того, transaction.id
необходимо задать уникальное определение производителя.
producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "transactional-producer-1");
KafkaProducer<String, String> producer = new KafkaProducer(producerProps);
После инициализации производителя приведенный ниже вызов гарантирует, что производитель регистрируется в брокере в качестве производителя транзакций -
producer.initTransactions();
Затем производитель должен явно начать транзакцию, выполнять операции отправки в разных разделах и секциях как обычные, а затем зафиксировать транзакцию с помощью следующего вызова:
producer.beginTransaction();
/*
Send to multiple topic partitions.
*/
producer.commitTransaction();
Если транзакция должна быть прервана из-за сбоя или времени ожидания, производитель может вызвать abortTransaction()
метод.
producer.abortTransaction();
Точно один раз семантика
Точно после того, как семантика строится на транзакционных производителях путем добавления потребителей в область транзакций производителей, поэтому каждая запись гарантированно будет считываться, обрабатываться и записываться ровно один раз.
Сначала создается экземпляр производителя транзакций.
producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "transactional-producer-1");
KafkaProducer<K, V> producer = new KafkaProducer(producerProps);
producer.initTransactions();
Затем потребитель должен быть настроен для чтения только нетрансакционных сообщений или фиксации транзакционных сообщений, задав следующее свойство:
consumerProps.put("isolation.level", "read_committed");
KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);
После создания экземпляра потребителя он может подписаться на раздел, из которого должны быть прочитаны записи.
consumer.subscribe(singleton("inputTopic"));
После того, как потребитель опрашивает записи из входного раздела, производитель начинает транзакционные области, в которой запись обрабатывается и записывается в выходной раздел. После записи записей создается обновленная карта смещения для всех секций. Затем производитель отправляет обновленную карту смещения в транзакцию перед фиксацией транзакции.
В любом исключении транзакция прерывается, и производитель повторяет обработку снова атомарно.
while (true) {
ConsumerRecords records = consumer.poll(Long.Max_VALUE);
producer.beginTransaction();
try {
for (ConsumerRecord record : records) {
/*
Process record as appropriate
*/
// Write to output topic
producer.send(producerRecord(“outputTopic”, record));
}
/*
Generate the offset map to be committed.
*/
Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
for (TopicPartition partition : records.partitions()) {
// Calculate the offset to commit and populate the map.
offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
}
// send offsets to transaction and then commit the transaction.
producer.sendOffsetsToTransaction(offsetsToCommit, group);
producer.commitTransaction();
} catch (Exception e)
{
producer.abortTransaction();
}
}
Предупреждение
Если транзакция не зафиксирована или прервана до этого max.transaction.timeout.ms
, транзакция будет прервана центрами событий автоматически. По умолчанию max.transaction.timeout.ms
задано значение 15 минут центрами событий, но производитель может переопределить его на меньшее значение, задав transaction.timeout.ms
свойство в свойствах конфигурации производителя.
Руководство по миграции
Если у вас есть существующие приложения Kafka, которые вы хотите использовать с Центры событий Azure, ознакомьтесь с руководством по миграции Kafka, чтобы Центры событий Azure быстро запустить работу.
Следующие шаги
Дополнительные сведения о Центрах событий и Центрах событий для Kafka см. в следующих статьях: