Azure Event Hubs で Apache Kafka Connect のサポートを統合する
Apache Kafka Connect は、Kafka クラスターを通じて、MySQL や HDFS、ファイル システムなどの外部システムに接続し、それらとの間でデータをインポート/エクスポートするためのフレームワークです。 この記事では、Event Hubs と共に Kafka Connect フレームワークを使用する方法について説明します。
この記事では、Kafka Connect をイベント ハブと統合し、基本的な FileStreamSource
および FileStreamSink
コネクタをデプロイする手順について説明します。 これらのコネクタは運用環境での使用を想定したものではありませんが、Azure Event Hubs が Kafka ブローカーとして機能する Kafka Connect のシナリオをエンド ツー エンドで示しています。
Note
このサンプルは GitHub で入手できます。
前提条件
このチュートリアルを完了するには、次の前提条件を満たしている必要があります。
- Azure のサブスクリプション。 アカウントがない場合は、無料アカウントを作成してください。
- Git
- Linux または macOS
- kafka.apache.org から入手できる最新の Kafka リリース
- Apache Kafka 用 Event Hubs の概要に関する記事を読む。
Event Hubs 名前空間を作成します
Event Hubs サービスとの間で送受信を行うには、イベント ハブの名前空間が必要です。 名前空間とイベント ハブを作成する手順については、イベント ハブの作成に関するページを参照してください。 Event Hubs の接続文字列と完全修飾ドメイン名 (FQDN) を、後で使用するために取得します。 手順については、「Get an Event Hubs connection string (Event Hubs の接続文字列を取得する)」を参照してください。
サンプル プロジェクトを複製する
Azure Event Hubs リポジトリを複製し、tutorials/connect サブフォルダーに移動します。
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
Event Hubs 用に Kafka Connect を構成する
Kafka Connect のスループットを Kafka から Event Hubs にリダイレクトする際に、最小限の再構成が必要となります。 次の connect-distributed.properties
サンプルは、Event Hubs 上の Kafka エンドポイントに対して認証と通信を行うように Connect を構成する方法を示しています。
# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs
重要
{YOUR.EVENTHUBS.CONNECTION.STRING}
を Event Hubs 名前空間への接続文字列に置き換えます。 接続文字列を取得する手順については、「Event Hubs の接続文字列の取得」を参照してください。 構成の例には、sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
などがあります。
Kafka Connect を実行する
この手順では、Event Hubs を使用し、Kafka Connect ワーカーをローカルから分散モードで開始して、クラスターの状態を維持します。
connect-distributed.properties
ファイルをローカルに保存します。 中かっこで囲んだ値はすべて置き換えてください。- お使いのマシン上にある Kafka リリースの場所に移動します。
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
を実行します。'INFO Finished starting connectors and tasks'
が表示されれば、Connect ワーカーの REST API は対話可能な状態です。
Note
Kafka Connect は、Kafka AdminClient API を使用して、圧縮などの推奨される構成を含むトピックを自動的に作成します。 Azure portal で名前空間をざっとチェックすると、Connect ワーカーの内部的なトピックが自動的に作成されていることがわかります。
Kafka Connect の内部トピックでは、圧縮を使用する必要があります。 Connect の内部トピックが正しく構成されていない場合、Event Hubs チームでは不適切な構成を修正する責任を負いません。
コネクタを作成する
このセクションでは、FileStreamSource
および FileStreamSink
コネクタを起動する手順について説明します。
入力データ ファイルと出力データ ファイル用のディレクトリを作成します。
mkdir ~/connect-quickstart
2 つのファイルを作成します。
FileStreamSource
コネクタの読み取り元となる、シード データを含んだファイルと、この例のFileStreamSink
コネクタの書き込み先となるファイルです。seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
FileStreamSource
コネクタを作成します。 中かっこ内の値は、実際のホーム ディレクトリのパスに置き換えてください。curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
このコマンドを実行すると、Event Hubs インスタンスにイベント ハブ
connect-quickstart
が確認できます。ソース コネクタの状態を確認します。
curl -s http://localhost:8083/connectors/file-source/status
必要に応じて Service Bus Explorer を使用して、イベントが
connect-quickstart
トピックに到着したことを確認できます。FileStreamSink コネクタを作成します。 こちらも、中かっこ内の値は、実際のホーム ディレクトリのパスに置き換えてください。
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
シンク コネクタの状態を確認します。
curl -s http://localhost:8083/connectors/file-sink/status
ファイル間でデータがレプリケートされていることと、そのデータが両方のファイル間で一致していることを確認します。
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
クリーンアップ
Kafka Connect は、イベント ハブのトピックを作成することによって、Connect クラスターが停止した後も永続的に存在する構成、オフセット、状態を格納します。 この永続化が必要な場合を除いて、これらのトピックを削除することをお勧めします。 また、このチュートリアルで作成された connect-quickstart
イベント ハブを削除することもできます。
関連するコンテンツ
Kafka 用 Event Hubs の詳細については、次の記事を参照してください。