Een Event Hubs-gegevensverbinding maken voor Azure Synapse Data Explorer met behulp van C# (preview)
Azure Synapse Data Explorer is een snelle en zeer schaalbare gegevensverkenningsservice voor logboek- en telemetriegegevens. Azure Synapse Data Explorer biedt opname (gegevens laden) van Event Hubs, IoT Hubs en blobs die zijn geschreven naar blobcontainers.
In dit artikel maakt u een Event Hubs-gegevensverbinding voor Azure Synapse Data Explorer met behulp van C#.
Vereisten
Een Azure-abonnement. Maak een gratis Azure-account.
Een Data Explorer-pool maken met Synapse Studio of Azure Portal
Maak een Data Explorer-database.
Selecteer Gegevens in Synapse Studio in het linkerdeelvenster.
Selecteer + (Nieuwe resource toevoegen) >Data Explorer-pool en gebruik de volgende informatie:
Instelling Voorgestelde waarde Beschrijving Poolnaam contosodataexplorer De naam van de Data Explorer-pool die moet worden gebruikt Naam TestDatabase De databasenaam moet uniek zijn binnen het cluster. Standaardretentieperiode 365 De periode (in dagen) dat de gegevens gegarandeerd beschikbaar blijven voor query's. De periode wordt gemeten vanaf het moment dat de gegevens zijn opgenomen. Standaardcacheperiode 31 De periode (in dagen) dat vaak opgevraagde gegevens beschikbaar blijven in de SSD-opslag of het RAM-geheugen in plaats van in de langetermijnopslag. Selecteer Maken om het profiel te maken. Het maakproces duurt meestal minder dan een minuut.
Notitie
Het opnemen van gegevens van een Event Hub in Data Explorer-pools werkt niet als uw Synapse-werkruimte gebruikmaakt van een beheerd virtueel netwerk met gegevensexfiltratiebeveiliging ingeschakeld.
- Visual Studio 2019, download en gebruik de gratis Visual Studio 2019 Community Edition. Azure-ontwikkeling inschakelen tijdens de installatie van Visual Studio.
Een tabel maken in het testcluster
Maak een tabel met de naam StormEvents
die overeenkomt met het schema van de gegevens in het bestand StormEvents.csv
.
Tip
Met de volgende codefragmenten maakt u voor bijna elke aanroep een exemplaar van een client. Dit wordt gedaan om elk fragment afzonderlijk uit te voeren. In productie worden de clientexemplaren opnieuw in gebruik genomen en moeten ze zo lang worden bewaard als nodig is. Eén clientexemplaren per URI zijn voldoende, zelfs wanneer u met meerdere databases werkt (database kan op opdrachtniveau worden opgegeven).
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Toewijzing van opname definiëren
Wijs de binnenkomende CSV-gegevens toe aan de kolomnamen die zijn gebruikt bij het maken van de tabel. Richt een CSV-kolomtoewijzingsobject in voor die tabel.
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
C# NuGet installeren
- Installeer het Microsoft.Azure.Management.Kusto NuGet-pakket.
Verificatie
Als u het volgende voorbeeld wilt uitvoeren, hebt u een Microsoft Entra-toepassing en service-principal nodig die toegang heeft tot resources. Zie Een Microsoft Entra-toepassing maken om een gratis Microsoft Entra-toepassing te maken en roltoewijzing toe te voegen op abonnementsniveau. U hebt ook de map-id (tenant), de toepassings-id en het clientgeheim nodig.
Een Event Hubs-gegevensverbinding toevoegen
In het volgende voorbeeld ziet u hoe u programmatisch een Event Hubs-gegevensverbinding toevoegt. Zie verbinding maken met de Event Hubs voor informatie over het toevoegen van een Event Hubs-gegevensverbinding met behulp van Azure Portal.
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Instelling | Voorgestelde waarde | Veldomschrijving |
---|---|---|
tenantId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxx-xxxxxxxxx | Uw tenant-id. Ook wel map-id genoemd. |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxx-xxxxxxxxx | De abonnements-id die u gebruikt voor het maken van resources. |
clientId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxx-xxxxxxxxx | De client-id van de toepassing die toegang heeft tot resources in uw tenant. |
clientSecret | xxxxxxxxxxxx | Het clientgeheim van de toepassing die toegang heeft tot resources in uw tenant. |
resourceGroupName | testrg | De naam van de resourcegroep die uw cluster bevat. |
clusterName | mykustocluster | De naam van het cluster. |
databaseName | mykustodatabase | De naam van de doeldatabase in uw cluster. |
dataConnectionName | myeventhubconnect | De gewenste naam van uw gegevensverbinding. |
tableName | StormEvents | De naam van de doeltabel in de doeldatabase. |
mappingRuleName | StormEvents_CSV_Mapping | De naam van de kolomtoewijzing die is gerelateerd aan de doeltabel. |
dataFormat | CSV | De gegevensindeling van het bericht. |
eventHubResourceId | Resource-id | De resource-id van uw Event Hub die de gegevens bevat voor opname. |
consumerGroup | $Default | De consumentengroep van uw Event Hub. |
locatie | US - centraal | De locatie van de gegevensverbindingsresource. |
compressie | Gzip of Geen | Het type gegevenscompressie. |
Gegevens genereren
Bekijk de voorbeeld-app waarmee gegevens worden gegenereerd en verzonden naar een Event Hub.
Een gebeurtenis kan een of meer records bevatten, tot de groottelimiet. In het volgende voorbeeld verzenden we twee gebeurtenissen, waarbij elk vijf records is toegevoegd:
var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;
for (var i = 0; i < 10; i++)
{
// Create the data
var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
counter++;
// Create the event
if (counter == recordsPerEvent)
{
var eventData = new EventData(Encoding.UTF8.GetBytes(data));
events.Add(eventData);
counter = 0;
data = string.Empty;
}
}
// Send events
eventHubClient.SendAsync(events).Wait();
Resources opschonen
Gebruik de volgende opdracht om de gegevensverbinding te verwijderen:
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);