Поделиться через


Транзакции в Apache Kafka для Центры событий Azure

В этой статье содержатся сведения об использовании API транзакций Apache Kafka с Центры событий Azure.

Обзор

Центры событий предоставляют конечную точку Kafka, которая может использоваться существующими клиентскими приложениями Kafka в качестве альтернативы запуску собственного кластера Kafka. Центры событий работают со многими из ваших существующих приложений Kafka. Дополнительные сведения см. в разделе Центры событий для Apache Kafka.

В этом документе основное внимание уделяется использованию транзакционного API Kafka с Центры событий Azure без проблем.

Примечание.

Транзакции Kafka в настоящее время находятся в общедоступной предварительной версии в категории "Премиум" и "Выделенный".

Транзакции в Apache Kafka

В облачных средах приложения должны быть устойчивы к сбоям сети и перезапускам пространства имен и обновлениям. Приложения, требующие строгих гарантий обработки, должны использовать платформу транзакций или API, чтобы обеспечить выполнение всех операций или нет таким образом, чтобы приложение и состояние данных было надежно управляемым. Если набор операций завершается сбоем, они могут быть надежно проверены атомарны, чтобы обеспечить правильные гарантии обработки.

Примечание.

Обычно при наличии нескольких операций, которые должны обрабатываться в режиме "все или ничего", обычно требуются гарантии транзакций.

Для всех других операций клиентские приложения по умолчанию устойчивы к повторению операции с экспоненциальным обратным выходом, если конкретная операция завершилась ошибкой.

Apache Kafka предоставляет транзакционный API, чтобы обеспечить такой уровень гарантий обработки в одном или другом наборе разделов или разделов.

Транзакции применяются к следующим случаям:

  • Производители транзакций.
  • Точно после обработки семантики.

Производители транзакций

Производители транзакций гарантируют, что данные записываются атомарны в несколько разделов в разных разделах. Производители могут инициировать транзакцию, записывать в несколько секций в одном разделе или в разных разделах, а затем фиксировать или прерывать транзакцию.

Чтобы убедиться, что производитель транзакционен, enable.idempotence необходимо задать значение true, чтобы убедиться, что данные записываются ровно один раз, избегая дублирования на стороне отправки . Кроме того, transaction.id необходимо задать уникальное определение производителя.

    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<String, String> producer = new KafkaProducer(producerProps);

После инициализации производителя приведенный ниже вызов гарантирует, что производитель регистрируется в брокере в качестве производителя транзакций -

    producer.initTransactions();

Затем производитель должен явно начать транзакцию, выполнять операции отправки в разных разделах и секциях как обычные, а затем зафиксировать транзакцию с помощью следующего вызова:

    producer.beginTransaction();
	/*
        Send to multiple topic partitions.
    */
    producer.commitTransaction();

Если транзакция должна быть прервана из-за сбоя или времени ожидания, производитель может вызвать abortTransaction() метод.

	producer.abortTransaction();

Точно один раз семантика

Точно после того, как семантика строится на транзакционных производителях путем добавления потребителей в область транзакций производителей, поэтому каждая запись гарантированно будет считываться, обрабатываться и записываться ровно один раз.

Сначала создается экземпляр производителя транзакций.


    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<K, V> producer = new KafkaProducer(producerProps);

    producer.initTransactions();

Затем потребитель должен быть настроен для чтения только нетрансакционных сообщений или фиксации транзакционных сообщений, задав следующее свойство:


	consumerProps.put("isolation.level", "read_committed");
	KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);

После создания экземпляра потребителя он может подписаться на раздел, из которого должны быть прочитаны записи.


    consumer.subscribe(singleton("inputTopic"));

После того, как потребитель опрашивает записи из входного раздела, производитель начинает транзакционные области, в которой запись обрабатывается и записывается в выходной раздел. После записи записей создается обновленная карта смещения для всех секций. Затем производитель отправляет обновленную карту смещения в транзакцию перед фиксацией транзакции.

В любом исключении транзакция прерывается, и производитель повторяет обработку снова атомарно.

	while (true) {
		ConsumerRecords records = consumer.poll(Long.Max_VALUE);
		producer.beginTransaction();
        try {
    		for (ConsumerRecord record : records) {
    			/*
                    Process record as appropriate
                */
                // Write to output topic
    	        producer.send(producerRecord(“outputTopic”, record));
    		}
    
            /*
                Generate the offset map to be committed.
            */
            Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
            for (TopicPartition partition : records.partitions()) {
                // Calculate the offset to commit and populate the map.
                offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
            }
            
            // send offsets to transaction and then commit the transaction.
    		producer.sendOffsetsToTransaction(offsetsToCommit, group);
    		producer.commitTransaction();
        } catch (Exception e)
        {
            producer.abortTransaction();
        }
	}

Предупреждение

Если транзакция не зафиксирована или прервана до этого max.transaction.timeout.ms, транзакция будет прервана центрами событий автоматически. По умолчанию max.transaction.timeout.ms задано значение 15 минут центрами событий, но производитель может переопределить его на меньшее значение, задав transaction.timeout.ms свойство в свойствах конфигурации производителя.

Руководство по миграции

Если у вас есть существующие приложения Kafka, которые вы хотите использовать с Центры событий Azure, ознакомьтесь с руководством по миграции Kafka, чтобы Центры событий Azure быстро запустить работу.

Следующие шаги

Дополнительные сведения о Центрах событий и Центрах событий для Kafka см. в следующих статьях: