Tworzenie połączenia danych usługi Event Hubs dla usługi Azure Synapse Data Explorer przy użyciu języka C# (wersja zapoznawcza)
Usługa Azure Synapse Data Explorer to szybka i wysoce skalowalna usługa eksploracji danych na potrzeby danych dzienników i danych telemetrycznych. Usługa Azure Synapse Data Explorer oferuje pozyskiwanie danych (ładowanie danych) z usług Event Hubs, IoT Hubs i obiektów blob zapisanych w kontenerach obiektów blob.
W tym artykule utworzysz połączenie danych usługi Event Hubs dla usługi Azure Synapse Data Explorer przy użyciu języka C#.
Wymagania wstępne
Subskrypcja platformy Azure. Utwórz bezpłatne konto platformy Azure.
Tworzenie puli eksploratora danych przy użyciu programu Synapse Studio lub witryny Azure Portal
Utwórz bazę danych eksploratora danych.
W programie Synapse Studio w okienku po lewej stronie wybierz pozycję Dane.
Wybierz + pozycję (Dodaj nowy zasób) >pulę Eksploratora danych i użyj następujących informacji:
Ustawienie Sugerowana wartość opis Nazwa puli contosodataexplorer Nazwa puli Eksploratora danych do użycia Nazwisko TestDatabase Nazwa bazy danych musi być unikatowa w obrębie klastra. Domyślny okres przechowywania 365 Przedział czasu (w dniach), w którym gwarantowana jest dostępność danych dla zapytania. Przedział czasu jest mierzony od momentu pozyskania danych. Domyślny okres pamięci podręcznej 31 Przedział czasu (w dniach), w którym często używane w zapytaniach dane mają być dostępne na dysku SSD lub w pamięci RAM zamiast w magazynie długoterminowym. Wybierz pozycję Utwórz, aby utworzyć bazę danych. Tworzenie zazwyczaj zajmuje mniej niż minutę.
Uwaga
Pozyskiwanie danych z centrum zdarzeń do pul eksploratora danych nie będzie działać, jeśli obszar roboczy usługi Synapse używa zarządzanej sieci wirtualnej z włączoną ochroną eksfiltracji danych.
- Program Visual Studio 2019 pobierz i użyj bezpłatnej wersji Visual Studio 2019 Community Edition. Włącz programowanie na platformie Azure podczas konfigurowania programu Visual Studio.
Tworzenie tabeli w klastrze testowym
Utwórz tabelę o nazwie StormEvents
, która będzie zgodna ze schematem danych w pliku StormEvents.csv
.
Napiwek
Poniższe fragmenty kodu tworzą wystąpienie klienta dla niemal każdego wywołania. Należy to zrobić, aby każdy fragment kodu był uruchamiany indywidualnie. W środowisku produkcyjnym wystąpienia klienta są ponowne i powinny być przechowywane tak długo, jak to konieczne. Pojedyncze wystąpienie klienta na identyfikator URI jest wystarczające, nawet podczas pracy z wieloma bazami danych (bazę danych można określić na poziomie polecenia).
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Definiowanie mapowania pozyskiwania
Zamapuj przychodzące dane CSV na nazwy kolumn używane podczas tworzenia tabeli. Aprowizuj obiekt mapowania kolumn CSV w tej tabeli.
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Instalowanie narzędzia NuGet w języku C#
- Zainstaluj pakiet NuGet Microsoft.Azure.Management.Kusto.
Uwierzytelnianie
Aby uruchomić poniższy przykład, potrzebujesz aplikacji Firmy Microsoft Entra i jednostki usługi, która może uzyskiwać dostęp do zasobów. Aby utworzyć bezpłatną aplikację Microsoft Entra i dodać przypisanie roli na poziomie subskrypcji, zobacz Tworzenie aplikacji Firmy Microsoft Entra. Potrzebny jest również identyfikator katalogu (dzierżawy), identyfikator aplikacji i klucz tajny klienta.
Dodawanie połączenia danych usługi Event Hubs
W poniższym przykładzie pokazano, jak programowo dodać połączenie danych usługi Event Hubs. Aby uzyskać informacje na temat dodawania połączenia danych usługi Event Hubs przy użyciu witryny Azure Portal, zobacz nawiązywanie połączenia z usługą Event Hubs .
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Ustawienie | Sugerowana wartość | Opis pola |
---|---|---|
tenantId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | Identyfikator dzierżawy. Znany również jako identyfikator katalogu. |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | Identyfikator subskrypcji używany do tworzenia zasobów. |
clientId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | Identyfikator klienta aplikacji, która może uzyskiwać dostęp do zasobów w dzierżawie. |
clientSecret | xxxxxxxxxxxxxxxx | Klucz tajny klienta aplikacji, który może uzyskiwać dostęp do zasobów w dzierżawie. |
resourceGroupName | testrg | Nazwa grupy zasobów zawierającej klaster. |
clusterName | mykustocluster | Nazwa klastra. |
databaseName | mykustodatabase | Nazwa docelowej bazy danych w klastrze. |
dataConnectionName | myeventhubconnect | Żądana nazwa połączenia danych. |
tableName | StormEvents | Nazwa tabeli docelowej w docelowej bazie danych. |
mappingRuleName | StormEvents_CSV_Mapping | Nazwa mapowania kolumny powiązana z tabelą docelową. |
dataFormat | csv | Format danych wiadomości. |
eventHubResourceId | Identyfikator zasobu | Identyfikator zasobu centrum zdarzeń, który przechowuje dane do pozyskiwania. |
consumerGroup | $Default | Grupa odbiorców centrum zdarzeń. |
lokalizacja | Środkowe stany USA | Lokalizacja zasobu połączenia danych. |
kompresja | Gzip lub Brak | Typ kompresji danych. |
Generuj dane
Zobacz przykładową aplikację , która generuje dane i wysyła je do centrum zdarzeń.
Zdarzenie może zawierać co najmniej jeden rekord do limitu rozmiaru. W poniższym przykładzie wysyłamy dwa zdarzenia, z których każdy ma pięć dołączonych rekordów:
var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;
for (var i = 0; i < 10; i++)
{
// Create the data
var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
counter++;
// Create the event
if (counter == recordsPerEvent)
{
var eventData = new EventData(Encoding.UTF8.GetBytes(data));
events.Add(eventData);
counter = 0;
data = string.Empty;
}
}
// Send events
eventHubClient.SendAsync(events).Wait();
Czyszczenie zasobów
Aby usunąć połączenie danych, użyj następującego polecenia:
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);