Integrar o suporte ao Apache Kafka Connect nos Hubs de Eventos do Azure
O Apache Kafka Connect é uma estrutura para conectar e importar/exportar dados de ou para qualquer sistema externo, como o MySQL, o HDFS e o sistema de arquivos, por meio de um cluster do Kafka. Este artigo explica como usar a estrutura do Kafka Connect com os Hubs de Eventos.
Este artigo explica como integrar o Kafka Connect a um hub de eventos e implantar conectores básicos FileStreamSource
e FileStreamSink
. Embora esses conectores não sirvam para uso em produção, eles demonstram um cenário Kafka Connect de ponta a ponta em que os Hubs de Eventos do Azure atuam como um agente do Kafka.
Observação
Este exemplo está disponível em GitHub.
Pré-requisitos
Para concluir este passo a passo, é necessário atender aos seguintes pré-requisitos:
- Assinatura do Azure. Se você não tiver uma, crie uma conta gratuita.
- Git
- Linux/macOS
- Versão mais recente do Kafka disponível no kafka.apache.org
- Leia o artigo introdutório Hubs de Eventos para o Apache Kafka
Criar um namespace dos Hubs de Eventos
É necessário um namespace do Hubs de Eventos para enviar e receber de qualquer serviço de Hub de Eventos. Para obter instruções sobre como criar um namespace e um hub de eventos, confira Criar um hub de eventos. Obtenha a cadeia de conexão dos Hubs de Eventos e o FQDN (nome de domínio totalmente qualificado) para uso posterior. Para obter instruções, confira Obter uma cadeia de conexão dos Hubs de Eventos.
Clonar o projeto de exemplo
Clone o repositório dos Hubs de Eventos do Azure e navegue até a subpasta tutorials/connect:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
Configurar o Kafka Connect para os Hubs de Eventos
É necessário um mínimo de reconfiguração para redirecionar a taxa de transferência do Kafka Connect para os Hubs de Eventos. O exemplo connect-distributed.properties
abaixo ilustra como configurar o Connect para fazer a autenticação e se comunicar com o ponto de extremidade do Kafka nos Hubs de Eventos:
# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs
Importante
Substitua {YOUR.EVENTHUBS.CONNECTION.STRING}
pela cadeia de conexão do seu namespace dos Hubs de Eventos. Para ver as instruções sobre como obter uma cadeia de conexão, confira Obter cadeia de conexão para Hubs de Eventos. Aqui está um exemplo de configuração: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Executar o Kafka Connect
Nesta etapa, um trabalho do Kafka Connect é iniciado localmente em modo distribuído, usando os Hubs de Eventos para manter o estado do cluster.
- Salve o arquivo
connect-distributed.properties
localmente. Substitua todos os valores entre chaves. - Navegue até o local da versão do Kafka em seu computador.
- Execute
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
. A API REST do trabalho do Connect estará pronta para interação quando você vir'INFO Finished starting connectors and tasks'
.
Observação
O Kafka Connect usa a API AdminClient do Kafka para criar automaticamente tópicos com configurações recomendadas, incluindo a compactação. Uma verificação rápida do namespace no portal do Azure revela que os tópicos internos do trabalho do Connect foram criados automaticamente.
Os tópicos internos do Kafka Connect precisam usar a compactação. A equipe dos Hubs de Eventos não será responsável por corrigir configurações inadequadas se os tópicos internos do Connect estiverem configurados incorretamente.
Criar conectores
Esta seção orienta você pelo processo de rotação dos conectores FileStreamSource
e FileStreamSink
.
Crie um diretório para os arquivos de dados de entrada e saída.
mkdir ~/connect-quickstart
Crie dois arquivos: um arquivo contendo dados semente que são lidos pelo conector
FileStreamSource
e outro no qual o conectorFileStreamSink
grava.seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
Criar um conector
FileStreamSource
. Substitua as chaves pelo caminho do diretório base.curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
Você deve ver o hub de eventos
connect-quickstart
em sua instância dos Hubs de Eventos depois de executar o comando.Verifique o status do conector de origem.
curl -s http://localhost:8083/connectors/file-source/status
Opcionalmente, você pode usar o Gerenciador do Barramento de Serviço para verificar se os eventos chegaram no tópico
connect-quickstart
.Crie um conector FileStreamSink. Mais uma vez, substitua as chaves pelo caminho do diretório base.
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
Verifique o status do conector do coletor.
curl -s http://localhost:8083/connectors/file-sink/status
Verifique se dados foram replicados entre os arquivos e se os dados são idênticos nos dois arquivos.
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
Limpeza
O Kafka Connect cria tópicos de Hubs de Eventos para armazenar status, deslocamentos e configurações que persistem mesmo depois que o cluster do Connect é desativado. A menos que essa persistência seja desejada, recomendamos que você exclua esses tópicos. Talvez seja ideal também excluir os Hubs de Eventos connect-quickstart
que foram criados no decorrer deste passo a passo.
Conteúdo relacionado
Para saber mais sobre os Hubs de Eventos para o Kafka, confira os artigos a seguir: