Udostępnij za pośrednictwem


Integrowanie obsługi narzędzia Apache Kafka Connect w usłudze Azure Event Hubs

Apache Kafka Connect to struktura umożliwiająca łączenie i importowanie/eksportowanie danych z/do dowolnego systemu zewnętrznego, takiego jak MySQL, HDFS i system plików za pośrednictwem klastra Kafka. Ten artykuł przeprowadzi Cię przez proces korzystania z platformy Kafka Connect z usługą Event Hubs.

W tym artykule przedstawiono proces integrowania platformy Kafka Connect z centrum zdarzeń oraz wdrażania podstawowych FileStreamSource łączników i FileStreamSink łączników. Chociaż te łączniki nie są przeznaczone do użytku produkcyjnego, przedstawiają kompleksowe scenariusze platformy Kafka Connect, w którym usługa Azure Event Hubs działa jako broker platformy Kafka.

Uwaga

Ten przykład jest dostępny w witrynie GitHub.

Wymagania wstępne

Aby ukończyć ten przewodnik, upewnij się, że dysponujesz następującymi elementami:

Tworzenie przestrzeni nazw usługi Event Hubs

Przestrzeń nazw usługi Event Hubs jest wymagana do wysyłania i odbierania zdarzeń z dowolnej usługi Event Hubs. Aby uzyskać instrukcje dotyczące tworzenia przestrzeni nazw i centrum zdarzeń, zobacz Tworzenie centrum zdarzeń. Pobierz parametry połączenia usługi Event Hubs i w pełni kwalifikowaną nazwę domeny (FQDN) w celu późniejszego użycia. Aby uzyskać instrukcje, zobacz Get an Event Hubs connection string (Pobieranie parametrów połączenia usługi Event Hubs).

Klonowanie projektu przykładowego

Sklonuj repozytorium usługi Azure Event Hubs i przejdź do podfolderu tutorials/connect:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

Konfigurowanie narzędzia Kafka Connect dla usługi Event Hubs

Przekierowywanie przepływności narzędzia Kafka Connect z platformy Kafka do usługi Event Hubs wymaga minimalnej rekonfiguracji. W poniższym przykładowym pliku connect-distributed.properties pokazano, jak skonfigurować narzędzie Connect do uwierzytelnienia i komunikowania się z punktem końcowym platformy Kafka w usłudze Event Hubs:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs 

Ważne

Zastąp {YOUR.EVENTHUBS.CONNECTION.STRING} element parametry połączenia przestrzeni nazw usługi Event Hubs. Aby uzyskać instrukcje dotyczące uzyskiwania parametry połączenia, zobacz Pobieranie parametry połączenia usługi Event Hubs. Oto przykładowa konfiguracja: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Uruchamianie narzędzia Kafka Connect

W tym kroku proces roboczy narzędzia Kafka Connect został uruchomiony lokalnie w trybie rozproszonym przy użyciu usługi Event Hubs w celu zachowania stanu klastra.

  1. connect-distributed.properties Zapisz plik lokalnie. Zamień wszystkie wartości w nawiasach klamrowych.
  2. Przejdź do lokalizacji platformy Kafka w maszynie.
  3. Uruchom program ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties. Pojawienie się tekstu 'INFO Finished starting connectors and tasks' oznacza, że interfejs API REST procesu roboczego narzędzia Connect jest gotowy do interakcji.

Uwaga

Platforma Kafka Connect używa interfejsu API AdminClient platformy Kafka do automatycznego tworzenia tematów z zalecanymi konfiguracjami, w tym kompaktowaniem. Z szybkiego sprawdzenia przestrzeni nazw w witrynie Azure Portal wynika, że tematy wewnętrzne procesu roboczego narzędzia Connect zostały utworzone automatycznie.

Tematy wewnętrzne platformy Kafka Connect muszą używać kompaktowania. Zespół usługi Event Hubs nie jest odpowiedzialny za naprawianie nieprawidłowych konfiguracji, jeśli tematy programu Internal Connect są niepoprawnie skonfigurowane.

Tworzenie łączników

W tej sekcji opisano proces konfigurowania FileStreamSource łączników i FileStreamSink ich uruchamiania.

  1. Utwórz katalog dla plików danych wejściowych i wyjściowych.

    mkdir ~/connect-quickstart
    
  2. Utwórz dwa pliki: jeden plik z danymi inicjacyjnymi FileStreamSource , z których łącznik odczytuje, a drugi, do którego zapisuje nasz FileStreamSink łącznik.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. Utwórz FileStreamSource łącznik. Zamień wartości w nawiasach klamrowych na ścieżkę do katalogu głównego.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    Centrum zdarzeń connect-quickstart powinno być widoczne w wystąpieniu usługi Event Hubs po uruchomieniu polecenia.

  4. Sprawdź stan łącznika źródła.

    curl -s http://localhost:8083/connectors/file-source/status
    

    Opcjonalnie możesz użyć Eksploratora usługi Service Bus, aby sprawdzić, czy zdarzenia dotarły do tematu connect-quickstart .

  5. Utwórz łącznik FileStreamSink. Ponownie zamień wartości w nawiasach klamrowych na ścieżkę do katalogu głównego.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. Sprawdź stan łącznika ujścia.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. Sprawdź, czy dane zostały zreplikowane między plikami oraz czy dane są takie same w obu plikach.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

Czyszczenie

Platforma Kafka Connect tworzy tematy usługi Event Hubs w celu przechowywania konfiguracji, przesunięć i stanu, które utrzymują się nawet po usunięciu klastra Connect. Jeśli ta trwałość nie jest wymagana, zalecamy usunięcie tych tematów. Możesz również usunąć connect-quickstart usługi Event Hubs, które zostały utworzone podczas tego przewodnika.

Aby dowiedzieć się więcej o usłudze Event Hubs dla platformy Kafka, zobacz następujące artykuły: