Vytvoření datového připojení služby Event Hubs pro Azure Synapse Data Explorer pomocí jazyka C# (Preview)
Azure Synapse Data Explorer je rychlá a vysoce škálovatelná služba pro zkoumání dat protokolů a telemetrie. Azure Synapse Data Explorer nabízí příjem dat (načítání dat) ze služby Event Hubs, IoT Hubs a objektů blob zapsaných do kontejnerů objektů blob.
V tomto článku vytvoříte datové připojení služby Event Hubs pro Azure Synapse Data Explorer pomocí jazyka C#.
Požadavky
Předplatné Azure. Vytvořte bezplatný účet Azure.
Vytvoření fondu Průzkumníka dat pomocí nástroje Synapse Studio nebo webu Azure Portal
Vytvořte databázi Průzkumníka dat.
V nástroji Synapse Studio v levém podokně vyberte Data.
Vyberte + fond Průzkumníka dat (přidat nový zdroj) >a použijte následující informace:
Nastavení Navrhovaná hodnota Popis Název fondu contosodataexplorer Název fondu Průzkumníka dat, který se má použít Název TestDatabase Název databáze musí být v rámci clusteru jedinečný. Výchozí doba uchovávání 365 Časové období (ve dnech), pro které je zaručeno, že jsou data k dispozici pro dotazování. Časový rozsah se začíná měřit od okamžiku, kdy jsou data ingestována. Výchozí období mezipaměti 31 Časové období (ve dnech), pro které se mají uchovávat často dotazovaná data dostupná v úložišti SSD nebo paměti RAM, a ne v dlouhodobějším úložišti. Výběrem možnosti Vytvořit vytvořte databázi. Vytvoření obvykle trvá méně než minutu.
Poznámka:
Ingestování dat z centra událostí do fondů Průzkumníka dat nebude fungovat, pokud váš pracovní prostor Synapse používá spravovanou virtuální síť s povolenou ochranou před exfiltrací dat.
- Visual Studio 2019, stáhněte a používejte bezplatnou sadu Visual Studio 2019 Community Edition. Povolte vývoj pro Azure během instalace sady Visual Studio.
Vytvoření tabulky v testovacím clusteru
Vytvořte tabulku s názvem StormEvents
, která odpovídá schématu StormEvents.csv
dat v souboru.
Tip
Následující fragmenty kódu vytvoří instanci klienta pro téměř každé volání. To se provádí, aby každý fragment kódu byl jednotlivě spustitelný. V produkčním prostředí se instance klienta znovu zadají a měly by se uchovávat tak dlouho, jak je to potřeba. Jedna instance klienta na identifikátor URI stačí, i když pracujete s více databázemi (databázi je možné zadat na úrovni příkazu).
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Definování mapování ingestace
Namapujte příchozí data CSV na názvy sloupců použitých při vytváření tabulky. Zřiďte objekt mapování sloupců CSV v této tabulce.
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Instalace balíčku NuGet v jazyce C#
- Nainstalujte balíček NuGet Microsoft.Azure.Management.Kusto.
Ověřování
Ke spuštění následujícího příkladu potřebujete aplikaci Microsoft Entra a instanční objekt, který má přístup k prostředkům. Pokud chcete vytvořit bezplatnou aplikaci Microsoft Entra a přidat přiřazení role na úrovni předplatného, přečtěte si téma Vytvoření aplikace Microsoft Entra. Potřebujete také ID adresáře (tenanta), ID aplikace a tajný klíč klienta.
Přidání datového připojení služby Event Hubs
Následující příklad ukazuje, jak přidat datové připojení Event Hubs programově. Informace o přidání datového připojení služby Event Hubs pomocí webu Azure Portal najdete v tématu připojení ke službě Event Hubs .
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Nastavení | Navrhovaná hodnota | Popis pole |
---|---|---|
tenantId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | ID vašeho tenanta Označuje se také jako ID adresáře. |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | ID předplatného, které používáte k vytvoření prostředku. |
clientId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | ID klienta aplikace, která má přístup k prostředkům ve vašem tenantovi. |
clientSecret | xxxxxxxxxxxxxxxx | Tajný klíč klienta aplikace, která má přístup k prostředkům ve vašem tenantovi. |
resourceGroupName | testrg | Název skupiny prostředků obsahující váš cluster. |
clusterName | mykustocluster | Název clusteru. |
databaseName | mykustodatabase | Název cílové databáze v clusteru |
dataConnectionName | myeventhubconnect | Požadovaný název datového připojení. |
tableName | StormEvents | Název cílové tabulky v cílové databázi. |
mappingRuleName | StormEvents_CSV_Mapping | Název mapování sloupců související s cílovou tabulkou. |
dataFormat | csv | Formát dat zprávy. |
eventHubResourceId | ID prostředku | ID prostředku vašeho centra událostí, které obsahuje data pro příjem dat. |
consumerGroup | $Default | Skupina příjemců vašeho centra událostí. |
location | USA – střed | Umístění prostředku datového připojení |
komprese | Gzip nebo None | Typ komprese dat. |
Generovat data
Podívejte se na ukázkovou aplikaci , která generuje data a odesílá je do centra událostí.
Událost může obsahovat jeden nebo více záznamů až do limitu velikosti. V následující ukázce odesíláme dvě události, každý má připojených pět záznamů:
var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;
for (var i = 0; i < 10; i++)
{
// Create the data
var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
counter++;
// Create the event
if (counter == recordsPerEvent)
{
var eventData = new EventData(Encoding.UTF8.GetBytes(data));
events.Add(eventData);
counter = 0;
data = string.Empty;
}
}
// Send events
eventHubClient.SendAsync(events).Wait();
Vyčištění prostředků
Pokud chcete datové připojení odstranit, použijte následující příkaz:
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);