使用 C# 建立 Azure Synapse 數據總管的事件中樞數據連線 (預覽)
記錄和遙測資料時,Azure Synapse 資料總管是快速且調整彈性高的資料探索服務。 Azure Synapse 資料總管提供事件中樞、IoT 中樞的擷取 (資料載入),及在 blob 容器寫入 blob。
在本文中,您會使用 C# 建立 Azure Synapse 數據總管的事件中樞數據連線。
必要條件
Azure 訂用帳戶。 建立免費的 Azure 帳戶。
使用 Synapse Studio 或 Azure 入口網站建立資料總管集區
建立資料總管資料庫。
在 Synapse Studio 的左側窗格上,選取 [資料]。
選取 + (新增資源) >[資料總管集區],並使用下列資訊:
設定 建議的值 描述 集區名稱 contosodataexplorer 要使用的資料總管集區名稱 名稱 TestDatabase 資料庫名稱在叢集內必須是唯一而不重複。 預設保留期限 365 保證資料持續可供查詢的時間範圍 (天)。 系統會從內嵌資料的時間開始測量時間範圍。 預設快取期間 31 在 SSD 儲存裝置或 RAM 中 (而非長期儲存裝置),讓受到頻繁查詢的資料維持可用狀態的時間範圍 (天)。 選取 [建立] 以建立資料庫。 建立時間通常不到一分鐘。
- 具有擷取數據的事件中樞。
注意
如果 Synapse 工作區使用已啟用資料外流保護的受控虛擬網路,則無法將資料從事件中樞擷取至資料總管集區。
- Visual Studio 2019,下載並使用 免費的 Visual Studio 2019 Community Edition。 在 Visual Studio 設定期間啟用 Azure 開發 。
在測試叢集上建立數據表
建立名為 StormEvents
的數據表,其符合檔案中 StormEvents.csv
數據的架構。
提示
下列代碼段會針對幾乎每個呼叫建立客戶端的實例。 這樣做是為了讓每個代碼段個別執行。 在生產環境中,客戶端實例會重新進入,而且應該視需要保留。 每個 URI 的單一用戶端實例就已足夠,即使使用多個資料庫(可以在命令層級上指定資料庫)。
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
定義擷取對應
將傳入 CSV 數據對應至建立資料表時所使用的數據行名稱。 在該數據表上布建 CSV 數據行對應物件 。
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
安裝 C# NuGet
驗證
若要執行下列範例,您需要可存取資源的Microsoft Entra 應用程式和服務主體。 若要建立免費的Microsoft Entra 應用程式,並在訂用帳戶層級新增角色指派,請參閱 建立Microsoft Entra 應用程式。 您也需要目錄(租使用者)識別碼、應用程式識別碼和客戶端密碼。
新增事件中樞數據連線
下列範例示範如何以程序設計方式新增事件中樞數據連線。 如需使用 Azure 入口網站 新增事件中樞數據連線的相關信息,請參閱連線到事件中樞。
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
設定 | 建議的值 | 欄位描述 |
---|---|---|
tenantId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 您的租用戶識別碼。 也稱為目錄標識碼。 |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 您用來建立資源的訂用帳戶標識碼。 |
clientId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | 應用程式可存取租用戶中資源的用戶端標識碼。 |
clientSecret | xxxxxxxx | 應用程式可存取租用戶中資源的客戶端密碼。 |
resourceGroupName | testrg | 包含叢集的資源群組名稱。 |
clusterName | mykustocluster | 您叢集的名稱。 |
databaseName | mykustodatabase | 叢集中目標資料庫的名稱。 |
dataConnectionName | myeventhubconnect | 數據連線所需的名稱。 |
tableName | StormEvents | 目標資料庫中的目標數據表名稱。 |
mappingRuleName | StormEvents_CSV_Mapping | 與目標數據表相關的數據行對應名稱。 |
dataFormat | csv | 訊息的數據格式。 |
eventHubResourceId | 資源識別碼 | 事件中樞的資源標識碼,其中包含要擷取的數據。 |
consumerGroup | $Default | 事件中樞的取用者群組。 |
location | Central US | 數據連線資源的位置。 |
壓縮 | Gzip 或 None | 數據壓縮的類型。 |
產生資料
請參閱產生數據的範例應用程式,並將其傳送至事件中樞。
事件可以包含一或多個記錄,上限為大小限制。 在下列範例中,我們會傳送兩個事件,每個事件都會附加五筆記錄:
var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;
for (var i = 0; i < 10; i++)
{
// Create the data
var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
counter++;
// Create the event
if (counter == recordsPerEvent)
{
var eventData = new EventData(Encoding.UTF8.GetBytes(data));
events.Add(eventData);
counter = 0;
data = string.Empty;
}
}
// Send events
eventHubClient.SendAsync(events).Wait();
清除資源
若要刪除資料連線,請使用下列命令:
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);