チュートリアル: Azure Event Grid と Azure Functions を使用して、Event Hubs でキャプチャされたデータを Azure Storage から Azure Synapse Analytics に移行する
このチュートリアルでは、Azure Event Grid と Azure Functions を使用して、Event Hubs でキャプチャされたデータを Azure Blob Storage から Azure Synapse Analytics (具体的には専用 SQL プール) に移行します。
この図は、このチュートリアルでお客様が作成するソリューションのワークフローを示しています。
- Azure イベント ハブに送信されたデータが Azure BLOB ストレージにキャプチャされます。
- データのキャプチャが完了すると、イベントが生成されて Azure Event Grid に送信されます。
- Azure Event Grid は、このイベント データを Azure 関数アプリに転送します。
- 関数アプリによってイベント データの BLOB URL が使用され、ストレージから BLOB が取得されます。
- 関数アプリによって BLOB データが Azure Synapse Analytics に移行されます。
この記事では、次の手順を実行します。
- チュートリアルに必要なインフラストラクチャをデプロイする
- 関数アプリにコードを発行する
- Event Grid のサブスクリプションを作成する
- サンプル データを Event Hubs にストリーム配信する
- キャプチャされたデータを Azure Synapse Analytics で確認する
前提条件
このチュートリアルを完了するには、以下が必要です。
- この記事では、Event Grid と Event Hubs (特にキャプチャ機能) について理解していることを前提としています。 Azure Event Grid に慣れていない場合は、「Azure Event Grid とは」をご覧ください。 Azure Event Hubs のキャプチャ機能について、詳しくは「Azure Event Hubs で Azure Blob Storage または Azure Data Lake Storage にイベントをキャプチャする」をご覧ください。
- Azure サブスクリプション。 Azure サブスクリプションをお持ちでない場合は、開始する前に 無料アカウント を作成してください。
- .NET デスクトップ開発、Azure 開発、ASP.NET および Web 開発、Node.js 開発、Python 開発のワークロードを含む Visual Studio。
- 自分のコンピューターに EventHubsCaptureEventGridDemo サンプル プロジェクトをダウンロードします。
- WindTurbineDataGenerator - Capture 機能を有効にしてサンプルの風力タービン データをイベント ハブに送信するシンプルな発行元。
- FunctionDWDumper - Avro ファイルが Azure Storage BLOB に取り込まれたときに Azure Event Grid から通知を受信する Azure 関数。 BLOB の URI パスを受信し、その内容を読み取り、そのデータを Azure Synapse Analytics (専用 SQL プール) にプッシュします。
インフラストラクチャをデプロイする
この手順では、Resource Manager テンプレートを使用して必要なインフラストラクチャをデプロイします。 テンプレートをデプロイすると、次のリソースが作成されます。
- Capture 機能が有効なイベント ハブ
- キャプチャされたファイル用のストレージ アカウント
- 関数アプリをホストするための App Service プラン
- イベントを処理するための関数アプリ
- データ ウェアハウスをホストするための SQL Server
- 移行したデータを格納するための Azure Synapse Analytics (専用 SQL プール)
Azure CLI を使用してインフラストラクチャをデプロイする
Azure portal にサインインします。
上部にある [Cloud Shell] ボタンを選択します。
ブラウザーの下部に Cloud Shell が開かれるのがわかります。
Cloud Shell で、上の図に示すように [Bash] を選択します (まだ選択されていない場合)。
次の CLI コマンドを実行して、Azure リソース グループを作成します。
次のコマンドをコピーして Cloud Shell ウィンドウに貼り付けます。 必要に応じて、リソース グループの名前と場所を変更します。
az group create -l eastus -n rgDataMigration
Enterキーを押します。
次に例を示します。
user@Azure:~$ az group create -l eastus -n rgDataMigration { "id": "/subscriptions/00000000-0000-0000-0000-0000000000000/resourceGroups/rgDataMigration", "location": "eastus", "managedBy": null, "name": "rgDataMigration", "properties": { "provisioningState": "Succeeded" }, "tags": null }
次の CLI コマンドを実行して、前のセクションで説明したリソース (イベント ハブ、ストレージ アカウント、関数アプリ、Azure Synapse Analytics) をすべてデプロイします。
コマンドをコピーして Cloud Shell ウィンドウに貼り付けます。 または、お好みのエディターにコピーして貼り付け、値を設定してから、コマンドを Cloud Shell にコピーすることもできます。 Azure リソース名が原因でエラーが発生した場合は、リソース グループを削除し、名前を修正して、コマンドを再試行してください。
重要
コマンドを実行する前に、次のエンティティの値を指定します。
- お客様が先ほど作成したリソース グループの名前。
- イベント ハブ名前空間の名前。
- イベント ハブの名前。 値はそのまま (hubdatamigration) でもかまいません。
- SQL サーバーの名前。
- SQL ユーザーの名前とパスワード。
- データベースの名前。
- ストレージ アカウントの名前。
- 関数アプリの名前。
az deployment group create \ --resource-group rgDataMigration \ --template-uri https://raw.githubusercontent.com/Azure/azure-docs-json-samples/master/event-grid/EventHubsDataMigration.json \ --parameters eventHubNamespaceName=<event-hub-namespace> eventHubName=hubdatamigration sqlServerName=<sql-server-name> sqlServerUserName=<user-name> sqlServerPassword=<password> sqlServerDatabaseName=<database-name> storageName=<unique-storage-name> functionAppName=<app-name>
Cloud Shell ウィンドウで Enter キーを押して、コマンドを実行します。 多数のリソースを作成するため、このプロセスには時間がかかる場合があります。 コマンドの結果で、エラーがないことを確認します。
ポータルの [Cloud Shell] ボタンまたは Cloud Shell ウィンドウの右上隅にある [X] ボタンを選択して Cloud Shell を閉じます。
リソースが作成されたことを確認する
Azure portal で、左側のメニューにある [リソース グループ] を選択します。
お客様のリソース グループの名前を検索ボックスに入力して、リソース グループの一覧をフィルター処理します。
一覧で、お客様のリソース グループを選択します。
次のリソースがリソース グループに表示されていることを確認します。
Azure Synapse Analytics でテーブルを作成する
このセクションでは、前に作成した専用 SQL プールにテーブルを作成します。
リソース グループ内のリソースの一覧で、専用 SQL プールを選択します。
[専用 SQL プール] ページにある左側のメニューの [主なタスク] セクションで、 [クエリ エディター (プレビュー)] を選択します。
SQL サーバーのユーザーの名前とパスワードを入力し、 [OK] を選択します。 クライアントによる SQL サーバーへのアクセスを許可するメッセージが表示された場合は、[SQL サーバー <your SQL server> の許可リスト IP <your IP Address>] を選択し、[OK] を選択します。
クエリ ウィンドウに、次の SQL スクリプトをコピーして実行します。
CREATE TABLE [dbo].[Fact_WindTurbineMetrics] ( [DeviceId] nvarchar(50) COLLATE SQL_Latin1_General_CP1_CI_AS NULL, [MeasureTime] datetime NULL, [GeneratedPower] float NULL, [WindSpeed] float NULL, [TurbineSpeed] float NULL ) WITH (CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = ROUND_ROBIN);
データが作成されたことをチュートリアルの最後に確認できるように、このタブまたはウィンドウを開いたままにしておきます。
Azure Functions アプリを発行する
最初に、Azure portal から Functions アプリの発行プロファイルを取得します。 次に、発行プロファイルを使用して、Visual Studio から Azure Functions プロジェクトまたはアプリを発行します。
発行プロファイル名の取得
[リソース グループ] ページのリソースの一覧で、Azure Functions アプリを選択します。
アプリの [Function App] (Function アプリ) ページで、コマンド バーの [発行プロファイルの取得] を選択します。
ファイルをダウンロードして、EventHubsCaptureEventGridDemo フォルダーの FunctionEGDDumper サブフォルダーに保存します。
発行プロファイルを使用して Functions アプリを発行する
Visual Studio を起動します。
お客様が前提条件の一環として GitHub からダウンロードした EventHubsCaptureEventGridDemo.sln ソリューションを開きます。 これは
/samples/e2e/EventHubsCaptureEventGridDemo
フォルダーにあります。ソリューション エクスプローラーで、 [FunctionEGDWDumper] プロジェクトを右クリックし、 [発行] を選択します。
次の画面で、[開始] または [発行プロファイルの追加] を選択します。
[発行] ダイアログ ボックスの [対象] で [プロファイルのインポート] を選択し、[次へ] を選択します。
[プロファイルのインポート] タブで、先ほど FunctionEGDWDumper フォルダーに保存した発行設定ファイルを選択し、[完了] を選択します。
Visual Studio でプロファイルを構成している場合は、 [発行] を選択します。 発行が成功したことを確認します。
Web ブラウザーで [Azure Function] ページを開いて、中央のペインで [Functions] を選びます。 EventGridTriggerMigrateData 関数が一覧に表示されていることを確認します。 表示されない場合は、もう一度 Visual Studio から発行し、ポータルのページを最新の状態に更新してください。
関数を発行すれば、イベントをサブスクライブする準備が整います。
イベントをサブスクライブする
Web ブラウザーの新しいタブまたはウィンドウで Azure portal にサインインします。
Azure portal で、左側のメニューにある [リソース グループ] を選択します。
お客様のリソース グループの名前を検索ボックスに入力して、リソース グループの一覧をフィルター処理します。
一覧で、お客様のリソース グループを選択します。
リソースの一覧から [Event Hubs 名前空間] を選択します。
[Event Hubs 名前空間] ページで、左側のメニューの [イベント] を選択し、ツール バーの [+ イベント サブスクリプション] を選択します。
[イベント サブスクリプションの作成] ページで、次の手順に従います。
イベント サブスクリプションの名前を入力します。
システム トピックの名前を入力します。 システム トピックは、送信者がイベントを送信するためのエンドポイントを提供します。 詳細については、システム トピックに関するページを参照してください
[エンドポイントのタイプ] で、 [Azure 関数] を選択します。
[エンドポイント] で、リンクを選択します。
[Azure 関数の選択] ページで、次のものが自動的に入力されない場合は、次の手順に従います。
- Azure 関数のある Azure サブスクリプションを選択します。
- 関数のリソース グループを選択します。
- 関数アプリを選択します。
- デプロイ スロットを選択します。
- EventGridTriggerMigrateData 関数を選択します。
[Azure 関数の選択] ページで、 [選択の確認] を選択します。
次に、 [イベント サブスクリプションの作成] ページに戻り、 [作成] を選択します。
イベント サブスクリプションが作成されていることを確認します。 Event Hubs 名前空間の [イベント] ページで [イベント サブスクリプション] タブに切り替えます。
データを生成するアプリを実行する
イベント ハブ、専用 SQL プール (以前の SQL Data Warehouse)、Azure 関数アプリ、およびイベント サブスクリプションの設定が完了しました。 イベント ハブのデータを生成するアプリケーションを実行する前に、いくつかの値を構成する必要があります。
Azure portal で、先ほど移動したお客様のリソース グループに移動します。
Event Hubs 名前空間を選択します。
[Event Hubs 名前空間] ページで、左側のメニューの [共有アクセス ポリシー] を選択します。
ポリシーの一覧で RootManageSharedAccessKey を選択します。
[接続文字列 - 主キー] テキスト ボックスの隣にあるコピー ボタンを選択します。
お客様の Visual Studio ソリューションに戻ります。
[WindTurbineDataGenerator] プロジェクトを右クリックし、 [スタートアップ プロジェクトとして設定] を選択します。
WindTurbineDataGenerator プロジェクトで、program.cs を開きます。
<EVENT HUBS NAMESPACE CONNECTION STRING>
をポータルからコピーした接続文字列に置き換えます。イベント ハブに対して
hubdatamigration
以外の名前を使用した場合は、<EVENT HUB NAME>
をそのイベント ハブの名前に置き換えます。private const string EventHubConnectionString = "Endpoint=sb://demomigrationnamespace.servicebus.windows.net/..."; private const string EventHubName = "hubdatamigration";
ソリューションをビルドします。 WindTurbineGenerator.exe アプリケーションを実行します。
数分後に、クエリ ウィンドウが開いている他のブラウザー タブで、データ ウェアハウスのテーブルに対してクエリを実行して、移行されたデータを確認します。
select * from [dbo].[Fact_WindTurbineMetrics]
重要
チュートリアルが複雑にならないように、接続文字列を使用して Azure Event Hubs 名前空間に対する認証を行います。 運用環境では Microsoft Entra ID 認証を使用することをお勧めします。 アプリケーションを使用する場合は、アプリケーションのマネージド ID を有効にし、Event Hubs 名前空間で ID に適切なロール (Azure Event Hubs 所有者、Azure Event Hubs データ送信者、または Azure Event Hubs データ受信者) を割り当てることができます。 詳しくは、Microsoft Entra ID を使用した Event Hubs へのアクセスの認可に関する記事をご覧ください。
ソリューションを監視する
このセクションは、ソリューションの監視またはトラブルシューティングに役立ちます。
ストレージ アカウントでキャプチャされたデータを表示する
リソース グループに移動し、イベント データのキャプチャに使用するストレージ アカウントを選択します。
[ストレージ アカウント] ページで、左側のメニューから [ストレージ ブラウザー] を選びます。
[BLOB コンテナー] を展開し、 [windturbinecapture] を選択します。
右側のペインで、対象の Event Hubs 名前空間と同じ名前のフォルダーを開きます。
対象のイベント ハブと同じ名前のフォルダー (hubdatamigration) を開きます。
フォルダーをドリルスルーすると、AVRO ファイルが表示されます。 次に例を示します。
Event Grid トリガーによって関数が呼び出されたことを確認する
リソース グループに移動し、関数アプリを選択します。
中央のペインで [Functions] タブを選びます。
一覧から EventGridTriggerMigrateData 関数を選択します。
[関数] ページで、左側のメニューの [監視] を選択します。
呼び出しログをキャプチャするアプリケーション分析情報を構成するために、 [構成] を選択します。
新しい Application Insights リソースを作成するか、既存のリソースを選択します。
関数の [監視] ページに戻ります。
イベントを送信しているクライアント アプリケーション (WindTurbineDataGenerator) がまだ実行されていることを確認します。 実行されていない場合は、アプリを実行します。
数分 (5 分以上) 待ってから、 [最新の情報に更新] ボタンを選択して、関数の呼び出しを確認します。
呼び出しを選択すると、詳細が表示されます。
Event Grid により、イベント データがサブスクライバーに配信されます。 次の例は、イベント ハブを介したデータ ストリーミングが BLOB にキャプチャされるときに生成されるデータを示します。 特に、
data
オブジェクトのfileUrl
プロパティがストレージ内の BLOB を指すことに注意してください。 関数アプリによってこの URL が使用され、キャプチャされたデータを含む BLOB ファイルが取得されます。{ "topic": "/subscriptions/<AZURE SUBSCRIPTION ID>/resourcegroups/rgDataMigration/providers/Microsoft.EventHub/namespaces/spehubns1207", "subject": "hubdatamigration", "eventType": "Microsoft.EventHub.CaptureFileCreated", "id": "4538f1a5-02d8-4b40-9f20-36301ac976ba", "data": { "fileUrl": "https://spehubstorage1207.blob.core.windows.net/windturbinecapture/spehubns1207/hubdatamigration/0/2020/12/07/21/49/12.avro", "fileType": "AzureBlockBlob", "partitionId": "0", "sizeInBytes": 473444, "eventCount": 2800, "firstSequenceNumber": 55500, "lastSequenceNumber": 58299, "firstEnqueueTime": "2020-12-07T21:49:12.556Z", "lastEnqueueTime": "2020-12-07T21:50:11.534Z" }, "dataVersion": "1", "metadataVersion": "1", "eventTime": "2020-12-07T21:50:12.7065524Z" }
データが専用 SQL プールに格納されていることを確認する
クエリ ウィンドウが開いているブラウザー タブで、専用 SQL プールのテーブルに対してクエリを実行して、移行されたデータを確認します。
次のステップ
- サンプルの設定と実行に関する詳細については、Event Hubs Capture と Event Grid のサンプルに関する記事をご覧ください。
- このチュートリアルでは、
CaptureFileCreated
イベントに対するイベント サブスクリプションを作成しました。 このイベントと Azure Blob Storage がサポートするすべてのイベントの詳細については、「Event Grid ソースとしての Azure Event Hub」を参照してください。 - Event Hubs のキャプチャ機能の詳細については、「Azure Event Hubs で Azure Blob Storage または Azure Data Lake Storage にイベントをキャプチャする」を参照してください。