チュートリアル: Azure Data Lake Storage Gen2 に格納されている Delta テーブルに書き込む
このチュートリアルでは、Stream Analytics ジョブを作成して、Azure Data Lake Storage Gen2 の Delta テーブルに書き込む方法について説明します。 このチュートリアルでは、次の作業を行う方法について説明します。
- イベント ハブにサンプル データを送信するイベント ジェネレーターをデプロイする
- Stream Analytics のジョブの作成
- Delta テーブルを使って Azure Data Lake Storage Gen2 を構成する
- Stream Analytics ジョブの実行
前提条件
始める前に、以下の手順を完了します。
- Azure サブスクリプションをお持ちでない場合は、無料アカウントを作成してください。
- TollApp イベント ジェネレーターを Azure にデプロイし、このリンクを使用して TollApp Azure テンプレートをデプロイします。 "interval" パラメーターを 1 に設定します。 この手順では、新しいリソース グループを作成して使います。
- Data Lake Storage Gen2 アカウントを作成します。
Stream Analytics のジョブの作成
Azure portal にサインインします。
左側のメニューから、 [すべてのサービス] を選択します。
[分析] セクションの [Stream Analytics ジョブ] にマウスを移動し、[+] (プラス) を選びます。
Azure Portal の左上隅にある [リソースの作成] を選択します。
結果の一覧で、 [Analytics]>[Stream Analytics ジョブ] の順に選択します。
[新しい Stream Analytics ジョブ] ページで、次の手順のようにします。
- [サブスクリプション] で、Azure サブスクリプションを選択します。
- [リソース グループ] で、TollApp のデプロイで前に使ったものと同じリソースを選びます。
- [名前] に、ジョブの名前を入力します。 Stream Analytics ジョブ名には、英数字、ハイフン、アンダースコアのみを使用することができます。長さは 3 文字以上 63 文字以下でなければなりません。
- [ホスティング環境] で、[クラウド] が選ばれていることを確認します。
- [ストリーミング ユニット] で、[1] を選びます。 ストリーミング ユニットとは、ジョブの実行に必要なコンピューティング リソースのことです。 ストリーミング ユニットのスケーリングについては、ストリーミング ユニットの理解と調整に関する記事を参照してください。
ページ下部にある [確認と作成] を選択します。
[確認および作成] ページで設定を確認し、[作成] を選んで Stream Analytics ページを作成します。
デプロイのページで、[リソースに移動] を選んで [Stream Analytics ジョブ] ページに移動します。
ジョブの入力を構成する
次の手順では、TollApp のデプロイで作成したイベント ハブを使用してデータを読み取るためにジョブの入力ソースを定義します。
前のセクションで作成した Stream Analytics ジョブを見つけます。
[Stream Analytics ジョブ] の [ジョブ トポロジ] セクションで、 [入力] を選択します。
[+ 入力の追加] と [イベント ハブ] を選びます。
TollApp Azure Template を使用して作成された次の値を入力フォームに入力します。
[入力のエイリアス] に「entrystream」と入力します。
[サブスクリプションから Event Hub を選択する] を選びます。
[サブスクリプション] で、Azure サブスクリプションを選択します。
[イベント ハブの名前空間] には、前のセクションで作成したイベント ハブの名前空間を選択します。
残りの設定では既定のオプションを使用し、[保存] を選択します。
ジョブの出力を構成する
次の手順では、ジョブがデータを書き込むことができる場所として、出力シンクを定義します。 このチュートリアルでは、Azure Data Lake Storage Gen2 の Delta テーブルに出力を書き込みます。
[Stream Analytics ジョブ] の [ジョブ トポロジ] セクションで、 [出力] オプションを選択します。
[+ 出力の追加]>[BLOB ストレージ/ADLS Gen2] を選びます。
出力フォームに次の詳細を入力して、 [保存] を選択します。
[出力のエイリアス] に「DeltaOutput」と入力します。
[サブスクリプションから Blob Storage または ADLS Gen2 を選択する] を選びます。
[サブスクリプション] で、Azure サブスクリプションを選択します。
[ストレージ アカウント] で、作成した ADLS Gen2 アカウント (tollapp で始まるもの) を選びます。
[コンテナー] で、[新規作成] を選び、一意のコンテナー名を指定します。
[イベントのシリアル化の形式] で、[Delta Lake] を選択します。 Delta Lake はここでオプションの 1 つとしてリストに表示されていますが、データ形式ではありません。 Delta Lake では、バージョン管理された Parquet ファイルを使用してデータを格納します。 Delta lake の詳細については、以下を参照してください。
[Delta テーブルのパス] には、「tutorial folder/delta table」と入力します。
残りの設定では既定のオプションを使用し、[保存] を選択します。
クエリを作成する
ここまでで、着信データ ストリームを読み取る Stream Analytics ジョブを設定しました。 次に、リアルタイムでデータを分析するクエリを作成します。 このクエリは、Stream Analytics に固有のいくつかの拡張を含む SQL に似た言語を使います。
次に、左側のメニューで、[ジョブ トポロジ] の下にある [クエリ] を選びます。
クエリ ウィンドウに次のクエリを入力します。 この例では、クエリは Event Hubs からデータを読み取り、選択した値を ADLS Gen2 の Delta テーブルにコピーします。
SELECT State, CarModel.Make, TollAmount INTO DeltaOutput FROM EntryStream TIMESTAMP BY EntryTime
ツール バーの [クエリの保存] を選びます。
Stream Analytics ジョブを開始して出力をチェックする
Azure portal でジョブの概要ページに戻り、[開始] を選びます。
[ジョブの開始] ページで、[ジョブ出力の開始時刻] に [今すぐ] が選ばれていることを確認し、ページの下部にある [開始] を選択します。
数分経ったら、ポータルで、ジョブの出力として構成したストレージ アカウントとコンテナーを見つけます。 コンテナーで指定されたフォルダーに Delta テーブルが表示されるようになりました。 ジョブは初めて開始するときに数分かかり、開始後はデータが到着すると実行され続けます。
リソースをクリーンアップする
リソース グループ、Stream Analytics ジョブ、およびすべての関連するリソースは、不要になったら削除します。 ジョブを削除すると、ジョブによって消費されるストリーミング ユニットに対する課金を回避することができます。 ジョブを後で使用する計画がある場合は、ジョブを停止し、必要なときに再起動することができます。 このジョブを継続して使用しない場合は、以下の手順を使用して、このチュートリアルで作成したすべてのリソースを削除してください。
- Azure Portal の左側のメニューで [リソース グループ] を選択し、作成したリソースの名前を選択します。
- リソース グループのページで [削除] を選択し、削除するリソースの名前をテキスト ボックスに入力してから [削除] を選択します。
次の手順
このチュートリアルでは、単純な Stream Analytics ジョブを作成し、受信データをフィルター処理し、結果を ADLS Gen2 アカウントの Delta テーブルに書き込みました。 Stream Analytics の詳細については、以下を参照してください。