Integrar o suporte do Apache Kafka Connect nos Hubs de Eventos do Azure
Apache Kafka Connect é uma estrutura para conectar e importar / exportar dados de / para qualquer sistema externo, como MySQL, HDFS e sistema de arquivos através de um cluster Kafka. Este artigo orienta você sobre o uso da estrutura Kafka Connect com Hubs de Eventos.
Este artigo orienta você na integração do Kafka Connect com um hub de eventos e na implantação de conectores básicos FileStreamSource
e FileStreamSink
de conectores. Embora esses conectores não se destinem ao uso em produção, eles demonstram um cenário Kafka Connect de ponta a ponta em que os Hubs de Eventos do Azure atuam como um broker Kafka.
Nota
Este exemplo está disponível no GitHub.
Pré-requisitos
Para concluir esta orientação, confirme que tem os seguintes pré-requisitos:
- Subscrição do Azure. Se não tiver uma subscrição, crie uma conta gratuita.
- Git
- Linux/macOS
- Última versão do Kafka disponível na kafka.apache.org
- Ler o artigo de introdução dos Event Hubs for Apache Kafka (Hubs de Eventos para Apache Kafka).
Criar um espaço de nomes dos Hubs de Eventos
É necessário um espaço de nomes dos Hubs de Eventos para enviar e receber a partir de qualquer serviços dos Hubs de Eventos. Consulte Criando um hub de eventos para obter instruções sobre como criar um namespace e um hub de eventos. Obtenha a cadeia de ligação dos Hubs de Eventos e o nome de domínio completamente qualificado (FQDN), para utilizar mais tarde. Para obter instruções, veja Get an Event Hubs connection string (Obter uma cadeia de ligação dos Hubs de Eventos).
Clonar o projeto de exemplo
Clone o repositório dos Hubs de Eventos do Azure e navegue para a subpasta tutorials/connect:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
Configurar o Kafka Connect para os Hubs de Eventos
Ao redirecionar o débito do Kafka Connect do Kafka para os Hubs de Eventos, é necessário fazer uma reconfiguração mínima. O exemplo seguinte, connect-distributed.properties
, ilustra como configurar o Connect para se autenticar e comunicar com o ponto final do Kafka nos Hubs de Eventos:
# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs
Importante
Substitua {YOUR.EVENTHUBS.CONNECTION.STRING}
pela cadeia de conexão do namespace Hubs de Eventos. Para obter instruções sobre como obter a cadeia de conexão, consulte Obter uma cadeia de conexão de Hubs de Eventos. Aqui está um exemplo de configuração: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Executar o Kafka Connect
Neste passo, é iniciada uma função de trabalho do Kafka Connect localmente no modo distribuído e são utilizados os Hubs de Eventos para manter o estado do cluster.
- Salve o
connect-distributed.properties
arquivo localmente. Confirme que substitui todos os valores dentro das chavetas. - Navegue para a localização da versão do Kafka no computador.
- Execute o
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
. Quando vir'INFO Finished starting connectors and tasks'
, significa que a API REST da função de trabalho do Connect está pronta para interação.
Nota
O Kafka Connect usa a API Kafka AdminClient para criar automaticamente tópicos com configurações recomendadas, incluindo compactação. Uma rápida observação ao espaço de nomes no portal do Azure mostra que os tópicos internos da função de trabalho do Connect foram criados de forma automática.
Os tópicos internos do Kafka Connect devem usar compactação. A equipe de Hubs de Eventos não é responsável por corrigir configurações inadequadas se os tópicos internos do Connect estiverem configurados incorretamente.
Criar conectores
Esta seção orienta FileStreamSource
você através da rotação e FileStreamSink
conectores.
Crie um diretório para os ficheiros de dados de entrada e de saída.
mkdir ~/connect-quickstart
Crie dois arquivos: um arquivo com dados de propagação a partir do qual o
FileStreamSource
conector lê e outro no qual nossoFileStreamSink
conector grava.seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
Crie um
FileStreamSource
conector. Confirme que substitui as chavetas pelo caminho do seu diretório de raiz.curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
Você deve ver o hub
connect-quickstart
de eventos na instância dos Hubs de Eventos depois de executar o comando.Verifique o estado do conector de origem.
curl -s http://localhost:8083/connectors/file-source/status
Opcionalmente, você pode usar o Service Bus Explorer para verificar se os eventos chegaram ao
connect-quickstart
tópico.Crie um conector FileStreamSink. Mais uma vez, confirme que substitui as chavetas pelo caminho do seu diretório de raiz.
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
Verifique o estado do conector sink.
curl -s http://localhost:8083/connectors/file-sink/status
Confirme que os dados foram replicados entre os ficheiros e que são idênticos em ambos os ficheiros.
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
Limpeza
O Kafka Connect cria tópicos de Hubs de Eventos para armazenar configurações, deslocamentos e status que persistem mesmo após o cluster Connect ter sido removido. A menos que essa persistência seja desejada, recomendamos que você exclua esses tópicos. Você também pode querer excluir os connect-quickstart
Hubs de Eventos que foram criados durante esta explicação passo a passo.
Conteúdos relacionados
Para saber mais sobre os Hubs de Eventos para Kafka, consulte os seguintes artigos:
- Guia do desenvolvedor do Apache Kafka para Hubs de Eventos do Azure
- Explore samples on our GitHub (Explorar exemplos no nosso GitHub)