Compartilhar via


Transações no Apache Kafka para os Hubs de Eventos do Azure

Este artigo fornece detalhes sobre como usar a API transacional do Apache Kafka com os Hubs de Eventos do Azure.

Visão geral

Os Hubs de Eventos fornecem um ponto de extremidade Kafka que pode ser usado por seus aplicativos cliente Kafka existentes como uma alternativa para executar seu próprio cluster Kafka. Os Hubs de Eventos funcionam com vários aplicativos Kafka existentes. Para saber mais, consulte Hubs de Eventos para o Apache Kafka.

Este documento se concentra em como usar a API transacional do Kafka com os Hubs de Eventos do Azure de forma integrada.

Observação

No momento, as transações do Kafka estão em versão prévia pública nas camadas Premium e Dedicada.

Transações no Apache Kafka

Em ambientes nativos de nuvem, os aplicativos devem ser resilientes a interrupções de rede e reinicializações e atualizações de namespace. Os aplicativos que exigem garantias rigorosas de processamento devem utilizar uma estrutura ou API transacional para garantir que todas as operações sejam executadas ou nenhuma delas o seja, de forma que o estado do aplicativo e dos dados seja gerenciado de maneira confiável. Se o conjunto de operações falhar, ele pode ser confiavelmente refeito de forma atômica para garantir que o processamento seja corretamente garantido.

Observação

As garantias transacionais normalmente são necessárias quando há várias operações que precisam ser processadas de forma "tudo ou nada".

Para todas as outras operações, os aplicativos clientes são resilientes por padrão, tentando a operação novamente com uma estratégia de retirada exponencial, caso a operação específica falhe.

O Apache Kafka oferece uma API transacional para garantir esse nível de garantias de processamento em um mesmo conjunto ou em conjuntos diferentes de tópicos/partições.

As transações se aplicam aos casos abaixo:

  • Produtores transacionais.
  • Semântica de processamento exatamente uma vez.

Produtores transacionais

Os produtores transacionais garantem que os dados sejam gravados atomicamente em várias partições em diferentes tópicos. Os produtores podem iniciar uma transação, gravar em várias partições em um mesmo tópico ou em tópicos diferentes, e então confirmar ou anular a transação.

Para garantir que um produtor seja transacional, o enable.idempotence deve ser definido como “true” para assegurar que os dados sejam gravados exatamente uma vez, evitando assim duplicatas no lado de envio. Além disso, transaction.id deve ser definido para identificar exclusivamente o produtor.

    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<String, String> producer = new KafkaProducer(producerProps);

Depois que o produtor está inicializado, a chamada abaixo garante que o produtor se registre com o agente como um produtor transacional:

    producer.initTransactions();

Em seguida, o produtor deve iniciar uma transação explicitamente, realizar operações de envio em diferentes tópicos e partições como de costume, e então confirmar a transação com a chamada abaixo:

    producer.beginTransaction();
	/*
        Send to multiple topic partitions.
    */
    producer.commitTransaction();

Se a transação precisar ser anulada devido a uma falha ou um tempo limite, o produtor poderá chamar o método abortTransaction().

	producer.abortTransaction();

Semântica exatamente uma vez

A semântica de exatamente uma vez se baseia nos produtores transacionais, adicionando consumidores ao escopo transacional dos produtores, para que cada registro seja garantido ser lido, processado e gravado exatamente uma vez.

Primeiro, o produtor transacional é instanciado:


    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<K, V> producer = new KafkaProducer(producerProps);

    producer.initTransactions();

Em seguida, o consumidor deve ser configurado para ler apenas mensagens não transacionais ou mensagens transacionais confirmadas definindo a propriedade abaixo:


	consumerProps.put("isolation.level", "read_committed");
	KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);

Depois que o consumidor for instanciado, ele poderá assinar o tópico de onde os registros devem ser lidos:


    consumer.subscribe(singleton("inputTopic"));

Após o consumidor buscar os registros do tópico de entrada, o produtor inicia o escopo transacional dentro do qual o registro é processado e gravado no tópico de saída. Depois que os registros são gravados, o mapa atualizado de deslocamentos para todas as partições é criado. Em seguida, o produtor envia esse mapa de deslocamento atualizado para a transação antes de confirmar a transação.

Em caso de qualquer exceção, a transação é anulada e o produtor tenta processar novamente de forma atômica.

	while (true) {
		ConsumerRecords records = consumer.poll(Long.Max_VALUE);
		producer.beginTransaction();
        try {
    		for (ConsumerRecord record : records) {
    			/*
                    Process record as appropriate
                */
                // Write to output topic
    	        producer.send(producerRecord(“outputTopic”, record));
    		}
    
            /*
                Generate the offset map to be committed.
            */
            Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
            for (TopicPartition partition : records.partitions()) {
                // Calculate the offset to commit and populate the map.
                offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
            }
            
            // send offsets to transaction and then commit the transaction.
    		producer.sendOffsetsToTransaction(offsetsToCommit, group);
    		producer.commitTransaction();
        } catch (Exception e)
        {
            producer.abortTransaction();
        }
	}

Aviso

Se a transação não for confirmada ou anulada antes de max.transaction.timeout.ms, a transação será anulada pelos Hubs de Eventos automaticamente. O padrão max.transaction.timeout.ms é definido como 15 minutos pelos Hubs de Eventos, mas o produtor pode substituí-lo por um valor menor definindo a propriedade transaction.timeout.ms nas propriedades de configuração do produtor.

Guia de Migração

Se você tiver aplicativos Kafka existentes que gostaria de usar com os Hubs de Eventos do Azure, examine o guia de migração do Kafka para o Azure Event Hubs para começar rapidamente.

Próximas etapas

Para saber mais sobre os Hubs de Eventos e Hubs de Eventos para o Kafka, confira os artigos a seguir: