Intégrer la prise en charge Apache Kafka Connect à Azure Event Hubs
Apache Kafka Connect est une infrastructure permettant de connecter et d’importer/exporter des données depuis/vers n’importe quel système externe (p. ex. : MySQL), HDFS, et système de fichiers via un cluster Kafka. Cet article vous montre comment utiliser le framework Kafka Connect avec Event Hubs.
Cet article vous guide tout au long de l’intégration de Kafka Connect avec un hub d’événements et du déploiement des connecteurs FileStreamSource
et FileStreamSink
de base. Bien que ces connecteurs ne soient pas destinés à une utilisation en production, ils présentent un scénario Kafka Connect de bout en bout où Azure Event Hubs agit comme un répartiteur Kafka.
Notes
Cet exemple est disponible sur GitHub.
Prérequis
Pour suivre cette procédure pas à pas, vérifiez que les conditions préalables ci-dessous sont bien remplies :
- Abonnement Azure. Si vous n’en avez pas, créez un compte gratuit.
- Git
- Linux/macOS
- Dernière version Kafka disponible à partir de kafka.apache.org
- Lisez l’article d’introduction Event Hubs pour Apache Kafka
Créer un espace de noms Event Hubs
Un espace de noms Event Hubs est requis pour échanger avec tout service Event Hubs. Pour obtenir des instructions sur la création d'un espace de noms et d'un Event Hub, consultez Créer un Event Hub. Obtenez la chaîne de connexion Event Hubs et le nom de domaine complet (FQDN) pour une utilisation ultérieure. Pour obtenir des instructions, consultez Obtenir une chaîne de connexion Event Hubs.
Cloner l’exemple de projet
Clonez le référentiel Azure Event Hubs et accédez au sous-dossier Tutoriels/Se connecter :
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
Configurer Kafka Connect pour Event Hubs
Une reconfiguration minimale est nécessaire pour rediriger le débit Kafka Connect de Kafka vers Event Hubs. L’exemple connect-distributed.properties
suivant montre comment configurer Connect pour s’authentifier et communiquer avec le point de terminaison Kafka sur Event Hubs :
# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs
Important
Remplacez {YOUR.EVENTHUBS.CONNECTION.STRING}
par la chaîne de connexion de votre espace de noms Event Hubs. Pour savoir comment obtenir la chaîne de connexion, consultez Obtenir une chaîne de connexion Event Hubs. Voici un exemple de configuration : sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Exécuter Kafka Connect
Dans cette étape, un Worker Kafka Connect est démarré en local en mode distribué, à l’aide d’Event Hubs pour maintenir l’état du cluster.
- Enregistrez le fichier
connect-distributed.properties
localement. Veillez à remplacer toutes les valeurs entre accolades. - Accédez à l’emplacement de la version de Kafka sur votre ordinateur.
- Exécutez
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
. L’API REST du Worker Connect est prêt pour l’interaction lorsque vous voyez'INFO Finished starting connectors and tasks'
.
Notes
Kafka Connect utilise l’API Kafka AdminClient pour créer automatiquement des rubriques avec les configurations recommandées, y compris le compactage. Une vérification rapide de l’espace de noms dans le portail Azure révèle que les rubriques internes du Worker Connect ont été créées automatiquement.
Les rubriques internes Kafka Connect doivent utiliser le compactage. L’équipe Event Hubs n’est pas responsable de la résolution des configurations incorrectes si les rubriques Connect internes ne sont pas correctement configurées.
Créer des connecteurs
Cette section vous guide tout au long de la mise en place des connecteurs FileStreamSource
et FileStreamSink
.
Créez un répertoire pour les fichiers de données d’entrée et de sortie.
mkdir ~/connect-quickstart
Créez deux fichiers : un avec les données de départ lues par le connecteur
FileStreamSource
et un autre où notre connecteurFileStreamSink
écrit.seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
Créez un connecteur
FileStreamSource
. Veillez à remplacer les accolades par le chemin d’accès de votre répertoire de base.curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
Vous devez voir le hub d’événements
connect-quickstart
sur votre instance Event Hubs après l’exécution de la commande.Vérifiez l’état du connecteur source.
curl -s http://localhost:8083/connectors/file-source/status
Si vous le souhaitez, vous pouvez utiliser Service Bus Explorer pour vérifier que les événements sont arrivés dans la rubrique
connect-quickstart
.Créez un connecteur FileStreamSink. Veillez de nouveau à remplacer les accolades par le chemin d’accès de votre répertoire de base.
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
Vérifiez l’état du connecteur récepteur.
curl -s http://localhost:8083/connectors/file-sink/status
Vérifiez que les données ont été répliquées entre les fichiers et que les données sont identiques entre les deux fichiers.
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
Nettoyage
Kafka Connect crée des rubriques Event Hubs pour stocker les configurations, les décalages et l’état qui persistent même après l’arrêt du cluster Connect. Sauf si cette persistance est souhaitée, nous vous recommandons de supprimer ces rubriques. Vous pouvez également supprimer les hubs d’événements connect-quickstart
qui ont été créés au cours de cette procédure pas à pas.
Contenu connexe
Pour plus d’informations sur Event Hubs pour Kafka, consultez les articles suivants :