다음을 통해 공유


Azure Event Hubs용 Apache Kafka의 트랜잭션

이 문서에서는 Azure Event Hubs에서 Apache Kafka 트랜잭션 API를 사용하는 방법에 대해 자세히 설명합니다.

개요

Event Hubs는 사용자 고유의 Kafka 클러스터를 실행하는 대신 기존 Kafka 클라이언트 애플리케이션에서 사용할 수 있는 Kafka 엔드포인트를 제공합니다. Event Hubs는 대부분의 기존 Kafka 애플리케이션에서 작동합니다. 자세한 내용은 Apache Kafka용 Event Hubs를 참조하세요.

이 문서에서는 Azure Event Hubs에서 Kafka의 트랜잭션 API를 원활하게 사용하는 방법에 중점을 둡니다.

참고 항목

Kafka 트랜잭션은 현재 프리미엄 및 전용 계층에서 공개 미리 보기로 제공됩니다.

Apache Kafka의 트랜잭션

클라우드 네이티브 환경에서 애플리케이션은 네트워크 중단 및 네임스페이스 다시 시작 및 업그레이드에 복원력을 유지해야 합니다. 엄격한 처리 보장이 필요한 애플리케이션은 트랜잭션 프레임워크 또는 API를 활용하여 모든 작업이 실행되거나 애플리케이션 및 데이터 상태가 안정적으로 관리되도록 보장하지 않아야 합니다. 작업 집합이 실패하는 경우 올바른 처리 보장을 보장하기 위해 원자성으로 다시 안정적으로 다시 시도할 수 있습니다.

참고 항목

트랜잭션 보장은 일반적으로 "전부 또는 전혀" 방식으로 처리해야 하는 여러 작업이 있는 경우에 필요합니다.

다른 모든 작업의 경우 클라이언트 애플리케이션은 기본적으로 복원력이 있어 특정 작업이 실패한 경우 지수 백오프를 사용하여 작업을 다시 시도합니다.

Apache Kafka는 동일한 또는 다른 토픽/파티션 집합에서 이러한 수준의 처리가 보장되도록 하는 트랜잭션 API를 제공합니다.

트랜잭션은 아래 사례에 적용됩니다.

  • 트랜잭션 생산자.
  • 정확히 한 번 처리 의미 체계.

트랜잭션 생산자

트랜잭션 생산자는 데이터가 여러 토픽의 여러 파티션에 원자성으로 기록되도록 합니다. 생산자는 트랜잭션을 시작하고, 동일한 토픽 또는 여러 토픽에서 여러 파티션에 쓴 다음, 트랜잭션을 커밋하거나 중단할 수 있습니다.

생산자가 트랜잭션 enable.idempotence 인지 확인하려면 데이터가 정확히 한 번 기록되도록 true로 설정하여 송신 쪽에서 중복을 방지해야 합니다. 또한 transaction.id 생산자를 고유하게 식별하도록 설정해야 합니다.

    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<String, String> producer = new KafkaProducer(producerProps);

생산자가 초기화되면 아래 호출은 생산자가 브로커에 트랜잭션 생산자로 등록하도록 합니다.

    producer.initTransactions();

그런 다음 생산자는 트랜잭션을 명시적으로 시작하고, 다른 토픽 및 파티션에서 정상적으로 보내기 작업을 수행한 다음, 아래 호출을 사용하여 트랜잭션을 커밋해야 합니다.

    producer.beginTransaction();
	/*
        Send to multiple topic partitions.
    */
    producer.commitTransaction();

오류 또는 시간 제한으로 인해 트랜잭션을 중단해야 하는 경우 생산자는 메서드를 호출할 abortTransaction() 수 있습니다.

	producer.abortTransaction();

정확히 한 번 의미 체계

정확히 한 번 의미 체계는 생산자의 트랜잭션 범위에 소비자를 추가하여 트랜잭션 생산자를 기반으로 구축되므로 각 레코드를 정확히 한 번 읽고 처리하고 쓸 수 있습니다.

먼저 트랜잭션 생산자가 인스턴스화됩니다.


    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<K, V> producer = new KafkaProducer(producerProps);

    producer.initTransactions();

그런 다음 소비자가 비전송 메시지만 읽도록 구성하거나 아래 속성을 설정하여 커밋된 트랜잭션 메시지를 읽도록 구성해야 합니다.


	consumerProps.put("isolation.level", "read_committed");
	KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);

소비자가 인스턴스화되면 레코드를 읽어야 하는 토픽을 구독할 수 있습니다.


    consumer.subscribe(singleton("inputTopic"));

소비자가 입력 항목의 레코드를 폴링한 후 생산자는 레코드가 처리되고 출력 토픽에 기록되는 트랜잭션 범위를 시작합니다. 레코드가 작성되면 모든 파티션에 대한 오프셋의 업데이트된 맵이 만들어집니다. 그런 다음 생산자는 트랜잭션을 커밋하기 전에 이 업데이트된 오프셋 맵을 트랜잭션에 보냅니다.

예외적으로 트랜잭션은 중단되고 생산자는 처리를 다시 원자성으로 다시 시도합니다.

	while (true) {
		ConsumerRecords records = consumer.poll(Long.Max_VALUE);
		producer.beginTransaction();
        try {
    		for (ConsumerRecord record : records) {
    			/*
                    Process record as appropriate
                */
                // Write to output topic
    	        producer.send(producerRecord(“outputTopic”, record));
    		}
    
            /*
                Generate the offset map to be committed.
            */
            Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
            for (TopicPartition partition : records.partitions()) {
                // Calculate the offset to commit and populate the map.
                offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
            }
            
            // send offsets to transaction and then commit the transaction.
    		producer.sendOffsetsToTransaction(offsetsToCommit, group);
    		producer.commitTransaction();
        } catch (Exception e)
        {
            producer.abortTransaction();
        }
	}

Warning

트랜잭션이 커밋되거나 중단되지 max.transaction.timeout.ms않은 경우 Event Hubs에서 트랜잭션을 자동으로 중단합니다. 기본값 max.transaction.timeout.ms 은 Event Hubs에서 15분 으로 설정되지만 생산자는 생산자 구성 속성의 속성을 설정하여 더 낮은 값으로 재정의 transaction.timeout.ms 할 수 있습니다.

마이그레이션 가이드

Azure Event Hubs와 함께 사용하려는 기존 Kafka 애플리케이션이 있는 경우 Azure Event Hubs에 대한 Kafka 마이그레이션 가이드를 검토하여 빠르게 실행되는 땅에 도달하세요.

다음 단계

Event Hubs 및 Kafka용 Event Hubs에 대해 자세한 내용은 다음 문서를 참조하세요.