Integrowanie obsługi narzędzia Apache Kafka Connect w usłudze Azure Event Hubs
Apache Kafka Connect to struktura umożliwiająca łączenie i importowanie/eksportowanie danych z/do dowolnego systemu zewnętrznego, takiego jak MySQL, HDFS i system plików za pośrednictwem klastra Kafka. Ten artykuł przeprowadzi Cię przez proces korzystania z platformy Kafka Connect z usługą Event Hubs.
W tym artykule przedstawiono proces integrowania platformy Kafka Connect z centrum zdarzeń oraz wdrażania podstawowych FileStreamSource
łączników i FileStreamSink
łączników. Chociaż te łączniki nie są przeznaczone do użytku produkcyjnego, przedstawiają kompleksowe scenariusze platformy Kafka Connect, w którym usługa Azure Event Hubs działa jako broker platformy Kafka.
Uwaga
Ten przykład jest dostępny w witrynie GitHub.
Wymagania wstępne
Aby ukończyć ten przewodnik, upewnij się, że dysponujesz następującymi elementami:
- Subskrypcja platformy Azure. Jeśli jej nie masz, utwórz bezpłatne konto.
- Usługa Git
- Linux/macOS
- Najnowsza wersja platformy Kafka dostępna w kafka.apache.org
- Przeczytaj artykuł z wprowadzeniem Usługa Event Hubs dla platformy Apache Kafka
Tworzenie przestrzeni nazw usługi Event Hubs
Przestrzeń nazw usługi Event Hubs jest wymagana do wysyłania i odbierania zdarzeń z dowolnej usługi Event Hubs. Aby uzyskać instrukcje dotyczące tworzenia przestrzeni nazw i centrum zdarzeń, zobacz Tworzenie centrum zdarzeń. Pobierz parametry połączenia usługi Event Hubs i w pełni kwalifikowaną nazwę domeny (FQDN) w celu późniejszego użycia. Aby uzyskać instrukcje, zobacz Get an Event Hubs connection string (Pobieranie parametrów połączenia usługi Event Hubs).
Klonowanie projektu przykładowego
Sklonuj repozytorium usługi Azure Event Hubs i przejdź do podfolderu tutorials/connect:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
Konfigurowanie narzędzia Kafka Connect dla usługi Event Hubs
Przekierowywanie przepływności narzędzia Kafka Connect z platformy Kafka do usługi Event Hubs wymaga minimalnej rekonfiguracji. W poniższym przykładowym pliku connect-distributed.properties
pokazano, jak skonfigurować narzędzie Connect do uwierzytelnienia i komunikowania się z punktem końcowym platformy Kafka w usłudze Event Hubs:
# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs
Ważne
Zastąp {YOUR.EVENTHUBS.CONNECTION.STRING}
element parametry połączenia przestrzeni nazw usługi Event Hubs. Aby uzyskać instrukcje dotyczące uzyskiwania parametry połączenia, zobacz Pobieranie parametry połączenia usługi Event Hubs. Oto przykładowa konfiguracja: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Uruchamianie narzędzia Kafka Connect
W tym kroku proces roboczy narzędzia Kafka Connect został uruchomiony lokalnie w trybie rozproszonym przy użyciu usługi Event Hubs w celu zachowania stanu klastra.
connect-distributed.properties
Zapisz plik lokalnie. Zamień wszystkie wartości w nawiasach klamrowych.- Przejdź do lokalizacji platformy Kafka w maszynie.
- Uruchom program
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
. Pojawienie się tekstu'INFO Finished starting connectors and tasks'
oznacza, że interfejs API REST procesu roboczego narzędzia Connect jest gotowy do interakcji.
Uwaga
Platforma Kafka Connect używa interfejsu API AdminClient platformy Kafka do automatycznego tworzenia tematów z zalecanymi konfiguracjami, w tym kompaktowaniem. Z szybkiego sprawdzenia przestrzeni nazw w witrynie Azure Portal wynika, że tematy wewnętrzne procesu roboczego narzędzia Connect zostały utworzone automatycznie.
Tematy wewnętrzne platformy Kafka Connect muszą używać kompaktowania. Zespół usługi Event Hubs nie jest odpowiedzialny za naprawianie nieprawidłowych konfiguracji, jeśli tematy programu Internal Connect są niepoprawnie skonfigurowane.
Tworzenie łączników
W tej sekcji opisano proces konfigurowania FileStreamSource
łączników i FileStreamSink
ich uruchamiania.
Utwórz katalog dla plików danych wejściowych i wyjściowych.
mkdir ~/connect-quickstart
Utwórz dwa pliki: jeden plik z danymi inicjacyjnymi
FileStreamSource
, z których łącznik odczytuje, a drugi, do którego zapisuje naszFileStreamSink
łącznik.seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
Utwórz
FileStreamSource
łącznik. Zamień wartości w nawiasach klamrowych na ścieżkę do katalogu głównego.curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
Centrum zdarzeń
connect-quickstart
powinno być widoczne w wystąpieniu usługi Event Hubs po uruchomieniu polecenia.Sprawdź stan łącznika źródła.
curl -s http://localhost:8083/connectors/file-source/status
Opcjonalnie możesz użyć Eksploratora usługi Service Bus, aby sprawdzić, czy zdarzenia dotarły do tematu
connect-quickstart
.Utwórz łącznik FileStreamSink. Ponownie zamień wartości w nawiasach klamrowych na ścieżkę do katalogu głównego.
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
Sprawdź stan łącznika ujścia.
curl -s http://localhost:8083/connectors/file-sink/status
Sprawdź, czy dane zostały zreplikowane między plikami oraz czy dane są takie same w obu plikach.
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
Czyszczenie
Platforma Kafka Connect tworzy tematy usługi Event Hubs w celu przechowywania konfiguracji, przesunięć i stanu, które utrzymują się nawet po usunięciu klastra Connect. Jeśli ta trwałość nie jest wymagana, zalecamy usunięcie tych tematów. Możesz również usunąć connect-quickstart
usługi Event Hubs, które zostały utworzone podczas tego przewodnika.
Powiązana zawartość
Aby dowiedzieć się więcej o usłudze Event Hubs dla platformy Kafka, zobacz następujące artykuły: