C# を使用して Azure Synapse Data Explorer 用の Event Hubs データベース接続を作成する (プレビュー)
Azure Synapse Data Explorer は、ログと利用統計情報のための高速で拡張性に優れたデータ探索サービスです。 Azure Synapse Data Explorer では、BLOB コンテナーに書き込まれた Event Hubs、IoT Hub、BLOB からのインジェスト (データの読み込み) を提供します。
この記事では、C# を使用して Azure Synapse Data Explorer 用の Event Hubs データベース接続を作成します。
前提条件
Azure サブスクリプション。 無料の Azure アカウントを作成します。
Synapse Studio または Azure portal を使用して Data Explorer プールを作成します
Data Explorer データベースを作成します。
Synapse Studio の左側のペインで、 [データ] を選択します。
+ (新しいリソースの追加) >[Data Explorer プール] を選択し、次の情報を使用します。
設定 提案された値 説明 プール名 contosodataexplorer 使用する Data Explorer プールの名前 Name TestDatabase データベース名はクラスター内で一意である必要があります。 既定のリテンション期間 365 クエリにデータを使用できることが保証される期間 (日数) です。 期間は、データが取り込まれた時点から測定されます。 既定のキャッシュ期間 31 頻繁にクエリされるデータが、長期ストレージではなく SSD ストレージまたは RAM で利用できるように保持される期間 (日数) です。 [作成] を選択してデータベースを作成します。 通常、作成にかかる時間は 1 分未満です。
Note
Synapse ワークスペースでデータ流出防止が有効になっているマネージド仮想ネットワークを使用している場合、イベント ハブから Data Explorer プールへのデータの取り込みは機能しません。
- Visual Studio 2019。無料の Visual Studio 2019 Community Edition をダウンロードして使用します。 Visual Studio のセットアップ中に、 [Azure の開発] を有効にしてください。
テスト クラスターにテーブルを作成する
StormEvents.csv
ファイル内のデータのスキーマと一致する、StormEvents
という名前のテーブルを作成します。
ヒント
次のコード スニペットは、ほぼすべての呼び出しに対してクライアントのインスタンスを作成します。 これは、各スニペットを個別に実行できるようにするためです。 実稼働環境では、クライアント インスタンスは再入可能であり、必要な限り保持する必要があります。 複数のデータベースを使用する場合でも、URI ごとに 1 つのクライアントインスタンスで十分です (データベースはコマンド レベルで指定できます)。
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
インジェストのマッピングを定義する
受信した CSV データを、テーブル作成時に使用される列名にマップします。 CSV 列マッピング オブジェクトをそのテーブルにプロビジョニングします。
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
C# NuGet をインストールする
- Microsoft.Azure.Management.Kusto NuGet パッケージをインストールします。
認証
次の例を実行するには、リソースにアクセスできる Microsoft Entra アプリケーションとサービス プリンシパルが必要です。 無料の Microsoft Entra アプリケーションを作成し、サブスクリプション レベルでロールの割り当てを追加するには、Microsoft Entra アプリケーションの作成に関する記事を参照してください。 また、ディレクトリ (テナント) ID、アプリケーション ID、およびクライアント シークレットも必要です。
Event Hubs データ接続の追加
次の例は、Event Hubs データ接続をプログラムを使用して追加する方法を示しています。 Azure portal を使用して Event Hubs データベース接続を追加する方法については、「Event Hubs への接続」を参照してください。
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
設定 | 推奨値 | フィールドの説明 |
---|---|---|
tenantId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | テナント ID。 ディレクトリ ID とも呼ばれます。 |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | リソースの作成に使用するサブスクリプション ID。 |
clientId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | ご利用のテナント内のリソースにアクセスできるアプリケーションのクライアント ID。 |
clientSecret | xxxxxxxxxxxxxx | ご利用のテナント内のリソースにアクセスできるアプリケーションのクライアント シークレット。 |
resourceGroupName | testrg | ご利用のクラスターを含むリソース グループの名前。 |
clusterName | mykustocluster | ご利用のクラスターの名前。 |
databaseName | mykustodatabase | ご利用のクラスター内のターゲット データベースの名前。 |
dataConnectionName | myeventhubconnect | データ接続の任意の名前。 |
tableName | StormEvents | ターゲット データベース内のターゲット テーブルの名前。 |
mappingRuleName | StormEvents_CSV_Mapping | ターゲット テーブルに関連付けられている列マッピングの名前。 |
dataFormat | csv | メッセージのデータ形式。 |
eventHubResourceId | リソース ID | インジェスト用のデータを保持しているイベント ハブのリソース ID。 |
consumerGroup | $Default | ご利用のイベント ハブのコンシューマー グループ。 |
location | 米国中部 | データ接続リソースの場所。 |
compression | Gzip または None | データ圧縮の種類。 |
データの生成
データを生成してイベント ハブに送信するサンプル アプリをご覧ください。
イベントには、そのサイズ制限を上限として、1 つ以上のレコードを含めることができます。 次の例では、2 つのイベントを送信します。それぞれに 5 つのレコードが追加されています。
var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;
for (var i = 0; i < 10; i++)
{
// Create the data
var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
counter++;
// Create the event
if (counter == recordsPerEvent)
{
var eventData = new EventData(Encoding.UTF8.GetBytes(data));
events.Add(eventData);
counter = 0;
data = string.Empty;
}
}
// Send events
eventHubClient.SendAsync(events).Wait();
リソースをクリーンアップする
データ接続を削除するには、次のコマンドを使用します。
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);