Sdílet prostřednictvím


Integrace podpory připojení Apache Kafka ve službě Azure Event Hubs

Apache Kafka Connect je architektura pro připojení a import/export dat z/do libovolného externího systému, jako je MySQL, HDFS a systém souborů prostřednictvím clusteru Kafka. Tento článek vás provede používáním architektury Kafka Connect se službou Event Hubs.

Tento článek vás provede integrací služby Kafka Connect s centrem událostí a nasazením základních FileStreamSource konektorů a FileStreamSink konektorů. I když tyto konektory nejsou určené pro produkční použití, předvádějí kompletní scénář připojení Kafka, ve kterém služba Azure Event Hubs funguje jako zprostředkovatel Kafka.

Poznámka:

Tato ukázka je k dispozici na GitHubu.

Požadavky

Abyste mohli dokončit tento návod, ujistěte se, že máte následující:

Vytvoření oboru názvů služby Event Hubs

K odesílání do jakékoli služby Event Hubs a příjmu z ní se vyžaduje obor názvů služby Event Hubs. Pokyny k vytvoření oboru názvů a centra událostí najdete v tématu Vytvoření centra událostí. Získejte plně kvalifikovaný název domény a připojovací řetězec služby Event Hubs pro pozdější použití. Pokyny najdete v tématu Získání připojovacího řetězce služby Event Hubs.

Naklonování ukázkového projektu

Naklonujte úložiště Azure Event Hubs a přejděte do podsložky tutorials/connect:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

Konfigurace připojení Kafka pro službu Event Hubs

Přesměrování propustnosti připojení Kafka z Kafka do služby Event Hubs vyžaduje minimální konfiguraci. Následující ukázka souboru connect-distributed.properties znázorňuje, jak pro připojení nakonfigurovat ověřování a komunikaci s koncovým bodem Kafka ve službě Event Hubs:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs 

Důležité

Nahraďte {YOUR.EVENTHUBS.CONNECTION.STRING} připojovací řetězec pro obor názvů služby Event Hubs. Pokyny k získání připojovací řetězec najdete v tématu Získání připojovací řetězec služby Event Hubs. Tady je příklad konfigurace: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Spuštění připojení Kafka

V tomto kroku se místně spustí pracovní proces připojení Kafka v distribuovaném režimu a k zachování stavu clusteru s použije služba Event Hubs.

  1. connect-distributed.properties Uložte soubor místně. Nezapomeňte nahradit všechny hodnoty v závorkách.
  2. Na svém počítači přejděte do umístění verze Kafka.
  3. Spusťte ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties. Jakmile se zobrazí 'INFO Finished starting connectors and tasks', rozhraní REST API pracovního procesu připojení je připravené k interakci.

Poznámka:

Kafka Connect používá rozhraní Kafka AdminClient API k automatickému vytváření témat s doporučenými konfiguracemi, včetně komprimace. Rychlou kontrolou oboru názvů na webu Azure Portal zjistíte, že se interní témata pracovního procesu připojení vytvořila automaticky.

Interní témata kafka Connect musí používat komprimace. Tým Event Hubs neodpovídá za opravu nesprávných konfigurací, pokud jsou interní témata připojení nesprávně nakonfigurovaná.

Vytvoření konektorů

Tato část vás provede otáčením FileStreamSource a FileStreamSink konektory.

  1. Vytvořte adresář pro vstupní a výstupní datové soubory.

    mkdir ~/connect-quickstart
    
  2. Vytvořte dva soubory: jeden soubor s počátečními daty, ze kterých FileStreamSource konektor čte, a druhý soubor, do kterého náš FileStreamSink konektor zapisuje.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. Vytvoření konektoru FileStreamSource Nezapomeňte nahradit složené závorky cestou k vašemu domovskému adresáři.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    Po spuštění příkazu byste měli vidět centrum connect-quickstart událostí ve vaší instanci služby Event Hubs.

  4. Zkontrolujte stav konektoru zdroje.

    curl -s http://localhost:8083/connectors/file-source/status
    

    Volitelně můžete pomocí Service Bus Exploreru ověřit, že události dorazí do connect-quickstart tématu.

  5. Vytvořte konektor FileStreamSink. Nezapomeňte opět nahradit složené závorky cestou k vašemu domovskému adresáři.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. Zkontrolujte stav konektoru jímky.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. Ověřte, že se data replikovala mezi soubory a že jsou v obou souborech identická.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

Vyčištění

Kafka Connect vytváří témata služby Event Hubs pro ukládání konfigurací, posunů a stavu, které se uchovávají i po ukončení clusteru Connect. Pokud tato trvalost není požadovaná, doporučujeme tato témata odstranit. Můžete také odstranit connect-quickstart službu Event Hubs, kterou jste vytvořili v tomto názorném postupu.

Další informace o službě Event Hubs pro Kafka najdete v následujících článcích: