共用方式為


適用於 Azure 事件中樞的 Apache Kafka 交易

本文提供如何使用 Apache Kafka 交易式 API 搭配 Azure 事件中樞 的詳細數據。

概觀

事件中樞提供 Kafka 端點,可供您現有的 Kafka 用戶端應用程式用來作為執行您自己的 Kafka 叢集的替代方案。 事件中樞適用於許多現有的 Kafka 應用程式。 如需詳細資訊,請參閱 適用於 Apache Kafka 的事件中樞

本檔著重於如何順暢地使用 Kafka 的交易式 API 與 Azure 事件中樞。

注意

Kafka 交易目前處於 Premium 和專用層的公開預覽狀態。

Apache Kafka 中的交易

在雲端原生環境中,應用程式必須能夠復原網路中斷和命名空間重新啟動和升級。 需要嚴格處理保證的應用程式必須利用交易架構或 API 來確保執行所有作業,或沒有任何作業,以便可靠地管理應用程式和數據狀態。 如果一組作業失敗,則可以以不可部分完成的方式再次嘗試它們,以確保正確的處理保證。

注意

當需要以「全部」或「無」方式處理多個作業時,通常需要交易式保證。

針對所有其他作業,如果特定作業失敗,用戶端應用程式預設復原,以指數輪詢重試作業。

Apache Kafka 提供交易式 API,以確保跨相同或不同主題/數據分割集的處理保證層級。

交易適用於下列案例:

  • 交易式產生者。
  • 剛好處理語意一次。

交易產生者

交易產生者可確保數據會以不可部分完成的方式寫入到不同主題的多個分割區。 產生者可以起始交易、寫入相同主題或跨不同主題的多個分割區,然後認可或中止交易。

若要確保產生者是交易式的,enable.idempotence應該設定為 true,以確保數據會完全寫入一次,因此避免傳送端重複。 此外, transaction.id 應該設定為唯一識別產生者。

    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<String, String> producer = new KafkaProducer(producerProps);

初始化產生者之後,下列呼叫可確保生產者向訊息代理程序註冊為交易式產生者 -

    producer.initTransactions();

產生者接著必須明確地開始交易、依正常方式跨不同主題和分割區執行傳送作業,然後使用下列呼叫認可交易 –

    producer.beginTransaction();
	/*
        Send to multiple topic partitions.
    */
    producer.commitTransaction();

如果交易因錯誤或逾時而需要中止,則產生者可以呼叫 abortTransaction() 方法。

	producer.abortTransaction();

完全一次語意

完全一旦語意建置在交易產生者上,方法是在產生者的交易範圍中新增取用者,讓每個記錄都保證能完全讀取、處理及寫入 一次

首先,交易產生者會具現化 -


    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<K, V> producer = new KafkaProducer(producerProps);

    producer.initTransactions();

然後,取用者必須設定為只讀非交易訊息,或藉由設定下列屬性來認可交易式訊息 –


	consumerProps.put("isolation.level", "read_committed");
	KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);

具現化取用者之後,即可從中訂閱必須讀取記錄的主題 –


    consumer.subscribe(singleton("inputTopic"));

取用者從輸入主題輪詢記錄之後,產生者會開始處理記錄並寫入輸出主題的交易範圍。 寫入記錄之後,就會建立所有數據分割的已更新位移對應。 接著,產生者會在認可交易之前,將這個更新的位移對應傳送至交易。

在任何例外狀況中,交易會中止,而產生者會以不可部分完成的方式重試處理。

	while (true) {
		ConsumerRecords records = consumer.poll(Long.Max_VALUE);
		producer.beginTransaction();
        try {
    		for (ConsumerRecord record : records) {
    			/*
                    Process record as appropriate
                */
                // Write to output topic
    	        producer.send(producerRecord(“outputTopic”, record));
    		}
    
            /*
                Generate the offset map to be committed.
            */
            Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
            for (TopicPartition partition : records.partitions()) {
                // Calculate the offset to commit and populate the map.
                offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
            }
            
            // send offsets to transaction and then commit the transaction.
    		producer.sendOffsetsToTransaction(offsetsToCommit, group);
    		producer.commitTransaction();
        } catch (Exception e)
        {
            producer.abortTransaction();
        }
	}

警告

如果交易在 之前 max.transaction.timeout.ms未認可或中止,則事件中樞會自動中止交易。 默認 max.transaction.timeout.ms 會由事件中樞設定為 15分鐘 ,但產生者可以在產生者組態屬性中設定 transaction.timeout.ms 屬性,將其覆寫為較低的值。

移轉手冊

如果您有想要搭配 Azure 事件中樞 使用的現有 Kafka 應用程式,請檢閱 Kafka 移轉指南,以快速執行 Azure 事件中樞

下一步

若要深入了解事件中樞和適用於 Kafka 的事件中樞,請參閱下列文章: