Azure Stream Analytics を使用した Azure Storage Blob から Azure Data Lake Storage Gen1 へのデータ ストリーム
この記事では、Azure Data Lake Storage Gen1 を Azure Stream Analytics ジョブの出力として使用する方法について説明します。 ここでは、Azure Storage BLOB (入力) からデータを読み取り、そのデータを Azure Data Lake Storage Gen1 (出力) に書き込む簡単なシナリオを紹介します。
前提条件
このチュートリアルを読み始める前に、次の項目を用意する必要があります。
Azure サブスクリプション。 Azure 無料試用版の取得に関するページを参照してください。
Azure Storage アカウント。 このアカウントから BLOB コンテナーを使用して、Stream Analytics ジョブ向けのデータを入力します。 このチュートリアルでは、storageforasa という名前のストレージ アカウントと、そのアカウント内に storageforasacontainer という名前のコンテナーを持っていることを前提とします。 コンテナーを作成したら、サンプル データ ファイルをアップロードします。
Data Lake Storage Gen1 アカウント。 「Azure portal で Azure Data Lake Storage Gen1 の使用を開始する」の手順に従ってください。 myadlsg1 という Data Lake Storage Gen1 アカウントを持っているとします。
Stream Analytics のジョブの作成
まず、入力ソースと出力先を含む Stream Analytics ジョブを作成します。 このチュートリアルでは、ソースは Azure BLOB コンテナー、出力先は Data Lake Storage Gen1 です。
Azure Portal にサインオンします。
左側のウィンドウで、 [Stream Analytics ジョブ] をクリックし、 [追加] をクリックします。
Note
ストレージ アカウントと同じリージョンにジョブを作成していることを確認してください。そうでない場合、リージョン間でのデータ転送で追加料金が発生します。
ジョブに BLOB 入力を作成
Stream Analytics ジョブのページを開き、左側のウィンドウで [入力] タブをクリックし、 [追加] をクリックします。
[新しい入力] ブレードで、次の値を指定します。
[入力のエイリアス] で、このジョブ入力の一意の名前を入力します。
[ソースの種類] で、 [データ ストリーム] を選択します。
[ソース] で、 [BLOB ストレージ] を選択します。
[サブスクリプション] で、 [現在のサブスクリプションの BLOB ストレージを使う] を選択します。
[ストレージ アカウント] で、前提条件の一部として作成したストレージ アカウントを選択します。
[コンテナー] で、選択したストレージ アカウント内に作成したコンテナーを選択します。
[イベントのシリアル化の形式] で、 [CSV] を選択します。
[区切り記号] で、 [タブ] を選択します。
[エンコード] で、 [UTF-8] を選択します。
Create をクリックしてください。 これで、ポータルは、この入力を追加して接続をテストします。
ジョブに Data Lake Storage Gen1 出力を作成
Stream Analytics ジョブのページを開き、 [出力] タブをクリックしてから [追加] をクリックし、 [Data Lake Storage Gen1] を選択します。
[新しい出力] ブレードで、次の値を指定します。
- [出力のエイリアス] で、このジョブ出力の一意名を入力します。 クエリの出力をこの Data Lake Storage Gen1 アカウントに出力するためにクエリで使用されるわかりやすい名前です。
- Data Lake Storage Gen1 アカウントへのアクセスを承認することを求められます。 [承認] をクリックします。
[新しい出力] ブレードで、引き続き次の値を指定します。
[アカウント名] で、ジョブ出力の送信先として作成済みの Data Lake Storage Gen1 アカウントを選択します。
[パス プレフィックスのパターン] で、指定した Data Lake Storage Gen1 アカウント内にファイルを書き込むために使用するファイル パスを入力します。
プレフィックス パスで日付トークンを使用する場合は、 [日付形式] でファイルを編成する日付形式を選択できます。
プレフィックス パスで時刻トークンを使用する場合は、 [時刻形式] でファイルを編成する時刻形式を選択できます。
[イベントのシリアル化の形式] で、 [CSV] を選択します。
[区切り記号] で、 [タブ] を選択します。
[エンコード] で、 [UTF-8] を選択します。
Create をクリックしてください。 これで、ポータルは、この出力を追加して接続をテストします。
Stream Analytics ジョブの実行
Stream Analytics ジョブを実行するには、 [クエリ] タブからクエリを実行する必要があります。このチュートリアルでは、次の画面キャプチャに示すように、プレースホルダーをジョブの入力および出力エイリアスで置き換えて、サンプル クエリを実行することができます。
画面上部の 保存 をクリックした後、概要 タブで 開始 をクリックします。 ダイアログ ボックスで [ユーザー設定時刻] を選択し、現在の日付と時刻を設定します。
[開始] をクリックしてジョブを開始します。 ジョブが開始されるまでに数分かかる場合があります。
Blob からデータを取得するジョブをトリガーするには、サンプル データ ファイルを BLOB コンテナーにコピーします。 サンプル データ ファイルは Azure Data Lake Git リポジトリで取得できます。 このチュートリアルでは、vehicle1_09142014.csv ファイルをコピーします。 Azure Storage Explorerなどのさまざまなクライアントを使用して、BLOB コンテナーにデータをアップロードすることができます。
[概要] タブの [監視] で、データがどのように処理されたかを確認します。
最後に、ジョブの出力データが Data Lake Storage Gen1 アカウントで使用可能であることを確認できます。
[データ エクスプローラー] ウィンドウで、Data Lake Storage Gen1 出力設定 (
streamanalytics/job/output/{date}/{time}
) で指定したフォルダー パスに出力が書き込まれていることがわかります。