Erstellen einer Event Hubs-Datenverbindung für Azure Synapse Data Explorer mit C# (Vorschau)
Azure Synapse Data Explorer ist ein schneller und hochgradig skalierbarer Dienst zur Untersuchung von Protokoll- und Telemetriedaten. Azure Synapse Data Explorer bietet die Erfassung von Daten aus Event Hubs, IoT Hubs und Blobs, die in Blobcontainer geschrieben werden.
In diesem Artikel erstellen Sie eine Event Hubs-Datenverbindung für Azure Synapse Data Explorer mithilfe von C#.
Voraussetzungen
Ein Azure-Abonnement. Erstellen Sie ein kostenloses Azure-Konto.
Erstellen eines Data Explorer-Pools über Synapse Studio oder im Azure-Portal
Erstellen Sie eine Data Explorer-Datenbank.
Wählen Sie in Synapse Studio im linken Bereich Daten aus.
Wählen Sie + (Neue Ressource hinzufügen) >Data Explorer-Pool aus, und verwenden Sie die folgenden Informationen:
Einstellung Vorgeschlagener Wert BESCHREIBUNG Poolname contosodataexplorer Name des zu verwendende Data Explorer-Pools Name TestDatabase Der Datenbankname muss innerhalb des Clusters eindeutig sein. Standardaufbewahrungszeitraum 365 Die Zeitspanne (in Tagen), für die garantiert wird, dass die Daten für Abfragen verfügbar bleiben. Die Zeitspanne wird ab dem Zeitpunkt gemessen, zu dem die Daten erfasst werden. Standardcachezeitraum 31 Die Zeitspanne (in Tagen), wie lange häufig abgefragte Daten im SSD-Speicher oder RAM (und nicht im längerfristigen Speicher) verfügbar bleiben. Wählen Sie Erstellen, um die Datenbank zu erstellen. Die Erstellung dauert in der Regel weniger als eine Minute.
Hinweis
Das Erfassen von Daten aus einem Event Hub in Data Explorer-Pools funktioniert nicht, wenn Ihr Synapse-Arbeitsbereich ein verwaltetes virtuelles Netzwerk mit aktiviertem Datenexfiltrationsschutz verwendet.
- Visual Studio 2019. Sie können die kostenlose Visual Studio 2019 Community Edition herunterladen und verwenden. Aktivieren Sie beim Setup von Visual Studio die Option Azure-Entwicklung.
Erstellen einer Tabelle im Testcluster
Erstellen Sie eine Tabelle mit dem Namen StormEvents
, die dem Schema der Daten in der Datei StormEvents.csv
entspricht.
Tipp
Die folgenden Codeausschnitte erstellen eine Instanz eines Clients für fast jeden Aufruf. Dadurch kann jeder Ausschnitt einzeln ausgeführt werden. In der Produktionsumgebung sind die Clientinstanzen wiedereintrittsfähig und sollten so lange wie nötig aufbewahrt werden. Eine einzelne Clientinstanz pro URI ist ausreichend, auch wenn Sie mit mehreren Datenbanken arbeiten (Datenbanken können auf Befehlsebene angegeben werden).
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Definieren der Erfassungszuordnung
Ordnen Sie die eingehenden CSV-Daten den beim Erstellen der Tabelle verwendeten Spaltennamen zu. Stellen Sie ein Objekt für die CSV-Spaltenzuordnung in dieser Tabelle bereit.
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Installieren eines C#-NuGet-Pakets
- Installieren Sie das NuGet-Paket Microsoft.Azure.Management.Kusto.
Authentifizierung
Um das folgende Beispiel ausführen zu können, benötigen Sie eine Microsoft Entra-Anwendung und einen Dienstprinzipal, der auf Ressourcen zugreifen kann. Informationen zum Erstellen einer kostenlosen Microsoft Entra-Anwendung und Hinzufügen einer Rollenzuweisung auf Abonnementebene finden Sie unter Erstellen einer Microsoft Entra-Anwendung. Außerdem benötigen Sie die Verzeichnis-ID (Mandanten-ID), die Anwendungs-ID und den geheimen Clientschlüssel.
Hinzufügen einer Event Hubs-Datenverbindung
Im folgenden Beispiel wird gezeigt, wie eine Event Hubs-Datenverbindung programmgesteuert hinzugefügt wird. Weitere Informationen zum Hinzufügen einer Event Hubs-Datenverbindung mithilfe des Azure-Portals finden Sie unter Herstellen einer Verbindung mit dem Event Hub.
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Einstellung | Empfohlener Wert | Feldbeschreibung |
---|---|---|
tenantId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | Ihre Mandanten-ID. Wird auch als Verzeichnis-ID bezeichnet. |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | Die Abonnement-ID, die Sie für die Ressourcenerstellung verwenden. |
clientId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | Die Client-ID der Anwendung, die auf Ressourcen in Ihrem Mandanten zugreifen kann. |
clientSecret | xxxxxxxxxxxxxx | Das Clientgeheimnis der Anwendung, die auf Ressourcen in Ihrem Mandanten zugreifen kann. |
resourceGroupName | testrg | Der Name der Ressourcengruppe, die Ihren Cluster enthält. |
clusterName | mykustocluster | Der Name Ihres Clusters. |
databaseName | mykustodatabase | Der Name der Zieldatenbank in Ihrem Cluster. |
dataConnectionName | myeventhubconnect | Der gewünschte Name Ihrer Datenverbindung. |
tableName | StormEvents | Der Name der Zieltabelle in der Zieldatenbank. |
mappingRuleName | StormEvents_CSV_Mapping | Der Name der Spaltenzuordnung, die mit der Zieltabelle verknüpft ist. |
dataFormat | csv | Das Datenformat der Nachricht. |
eventHubResourceId | Ressourcen-ID | Die Ressourcen-ID Ihres Event Hubs mit den Daten für die Erfassung |
consumerGroup | $Default | Die Consumergruppe Ihres Event Hubs. |
location | USA, Mitte | Der Speicherort der Datenverbindungsressource. |
compression | Gzip oder Keine | Der Typ der Datenkomprimierung |
Generieren von Daten
Sehen Sie sich die Beispiel-App an, die Daten generiert und an einen Event Hub sendet.
Ein Ereignis kann entsprechend dem Größenlimit einen oder mehrere Datensätze enthalten. Im folgenden Beispiel werden zwei Ereignisse mit jeweils fünf angefügten Datensätzen gesendet:
var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;
for (var i = 0; i < 10; i++)
{
// Create the data
var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
counter++;
// Create the event
if (counter == recordsPerEvent)
{
var eventData = new EventData(Encoding.UTF8.GetBytes(data));
events.Add(eventData);
counter = 0;
data = string.Empty;
}
}
// Send events
eventHubClient.SendAsync(events).Wait();
Bereinigen von Ressourcen
Um die Datenverbindung zu löschen, verwenden Sie den folgenden Befehl:
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);