Delta Lake 形式で Event Hubs からデータをキャプチャする
この記事では、ノー コード エディターを使用して、Azure Data Lake Storage Gen2 アカウントの Event Hubs のストリーミング データを Delta Lake 形式で自動的にキャプチャする方法について説明します。
前提条件
- お使いの Azure Event Hubs と Azure Data Lake Storage Gen2 リソースはパブリックにアクセスできる必要があり、ファイアウォールの内側に置いたり、Azure Virtual Network でセキュリティ保護したりすることはできません。
- Event Hubs のデータは、JSON、CSV、または Avro 形式でシリアル化される必要があります。
データをキャプチャするようにジョブを構成する
Azure Data Lake Storage Gen2 内のデータをキャプチャするように Stream Analytics ジョブを構成するには、次の手順に従います。
Azure portal で、イベント ハブに移動します。
[機能] >[データの処理] の順に選択し、[ADLS Gen2 へのデータを Delta Lake 形式でキャプチャする] カードの [開始] を選択します。
または、[機能]>[キャプチャ] を選択し、[出力イベントのシリアル化形式] で [Delta Lake] オプションを選択し、[データ キャプチャ構成の開始] を選択します。
Event Hubs でのデータの種類として [シリアル化] を指定し、ジョブが Event Hubs に接続するのに使用する [認証方法] を指定します。 次に、 [接続](Connect) を選択します。
接続が正常に確立されると、次の情報が表示されます。
[Azure Data Lake Storage Gen2] タイルを選択して構成を編集します。
[Azure Data Lake Storage Gen2] 構成ページで、次の手順を行います。
ドロップダウン メニューから [サブスクリプション]、[ストレージ アカウント名]、[コンテナー] を選択します。
[サブスクリプション] が選択されると、[認証方法] と [ストレージ アカウント キー] が自動的に入力されます。
Delta テーブル パスは、Azure Data Lake Storage Gen2 に格納されている Delta Lake テーブルの場所と名前を指定するために使用されます。 1 つ以上のパス セグメントを使用して、Delta テーブルへのパスと Delta テーブル名を定義するよう選択できます。 詳細については、Delta Lake テーブルへの書き込みに関するページを参照してください。
[接続] を選択します。
接続が確立されると、出力データに存在するフィールドが表示されます。
コマンド バーで 「保存」 を選択して、構成を保存します。
コマンド バーで [開始] を選択して、データをキャプチャするストリーミング フローを開始します。 次に、[Stream Analytics ジョブの開始] ウィンドウで次の手順を行います。
- 出力開始時刻を選択します。
- ジョブを実行するストリーミング ユニット (SU) の数を選択します。 SU は、Stream Analytics ジョブを実行するために割り当てられているコンピューティング リソースを表しています。 詳細については、Azure Stream Analytics のストリーミング ユニットに関するページを参照してください。
[開始] を選択すると、2 分以内にジョブの実行が開始され、次の図に示すように、タブ セクションにメトリックが表示されます。
出力の確認
Delta lake 形式の Parquet ファイルが Azure Data Lake Storage コンテナーに生成されていることを確認します。
Event Hubs の geo レプリケーション機能を使用する場合の考慮事項
Azure Event Hubs では最近、geo レプリケーション機能のプレビューがローンチされました。 この機能は、Azure Event Hubs の Geo ディザスター リカバリー 機能とは異なります。
フェールオーバーの種類が [適用]、レプリケーションの整合性が [非同期]である場合、Stream Analytics ジョブでは、Azure Event Hubs 出力への出力が 1 回のみ行われることは保証されません。
Event Hubs を出力とするプロデューサーである Azure Stream Analytics が、フェールオーバー期間中および Event Hubs によるスロットリング中に、プライマリとセカンダリの間のレプリケーションのラグが最大構成ラグに達すると、ジョブでウォーターマーク遅延を検出する場合があります。
Event Hubs を入力とするコンシューマーである Azure Stream Analytics が、フェールオーバー期間中にウォーターマーク遅延を検出し、フェールオーバーの完了後、データをスキップするか、重複データを見つけようとする場合があります。
これらの注意事項により、Event Hubs のフェールオーバーが完了した直後に、適切な開始時刻で Stream Analytics ジョブを再起動することをお勧めします。 また、Event Hubs の geo レプリケーション機能はパブリック プレビュー段階であるため、現時点で運用環境の Stream Analytics ジョブにこのパターンを使用することはお勧めしません。 現在の Stream Analytics の動作は、Event Hubs の geo レプリケーション機能が一般公開される前に改善され、Stream Analytics の運用ジョブで使用できるようになります。
次のステップ
これで、Stream Analytics のノー コード エディターを使用して、Event Hubs のデータを Delta lake 形式で Azure Data Lake Storage Gen2 にキャプチャするジョブを作成する方法を確認しました。 次は、Azure Stream Analytics の詳細と、作成したジョブを監視する方法について学習します。