Skapa en Event Hubs-dataanslutning för Azure Synapse Data Explorer med hjälp av C# (förhandsversion)
Azure Synapse Data Explorer är en snabb och mycket skalbar datautforskningstjänst för logg- och telemetridata. Azure Synapse Data Explorer erbjuder inmatning (datainläsning) från Event Hubs, IoT Hubs och blobar som skrivits till blobcontainrar.
I den här artikeln skapar du en Event Hubs-dataanslutning för Azure Synapse Data Explorer med hjälp av C#.
Förutsättningar
En Azure-prenumeration. Skapa ett kostnadsfritt Azure-konto.
Skapa en Data Explorer-pool med Synapse Studio eller Azure Portal
Skapa en Data Explorer-databas.
Välj Data i fönstret till vänster i Synapse Studio.
Välj + (Lägg till ny resurs) >Data Explorer-pool och använd följande information:
Inställning Föreslaget värde beskrivning Poolnamn contosodataexplorer Namnet på datautforskarens pool som ska användas Name TestDatabase Databasnamnet måste vara unikt inom klustret. Standardkvarhållningsperiod 365 Det tidsintervall (i dagar) då det är garanterat att data förblir tillgängliga för frågor. Tidsintervallet mäts från det att data matas in. Standardcacheperiod 31 Det tidsintervall (i dagar) då data som frågor körs mot ofta ska vara tillgängliga i SSD-lagring eller RAM i stället för i långsiktig lagring. Välj Skapa för att skapa databasen. Det brukar ta mindre än en minut att skapa en databas.
Kommentar
Inmatning av data från en händelsehubb till Data Explorer-pooler fungerar inte om din Synapse-arbetsyta använder ett hanterat virtuellt nätverk med dataexfiltreringsskydd aktiverat.
- Visual Studio 2019, ladda ned och använd den kostnadsfriaVisual Studio 2019 Community Edition. Aktivera Azure-utveckling under Visual Studio-installationen.
Skapa en tabell i ditt testkluster
Skapa en tabell med namnet StormEvents
som matchar schemat för data i filen StormEvents.csv
.
Dricks
Följande kodfragment skapar en instans av en klient för nästan varje anrop. Detta görs för att göra varje kodfragment individuellt runnable. I produktion är klientinstanserna reentrant och bör behållas så länge som det behövs. Det räcker med en enskild klientinstans per URI, även när du arbetar med flera databaser (databasen kan anges på kommandonivå).
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Definiera mappning av inmatning
Mappa inkommande CSV-data till de kolumnnamn som används när du skapade tabellen. Etablera ett CSV-kolumnmappningsobjekt i tabellen.
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Installera C# NuGet
- Installera NuGet-paketet Microsoft.Azure.Management.Kusto.
Autentisering
Om du vill köra följande exempel behöver du ett Microsoft Entra-program och tjänstens huvudnamn som kan komma åt resurser. Information om hur du skapar ett kostnadsfritt Microsoft Entra-program och lägger till rolltilldelning på prenumerationsnivå finns i Skapa ett Microsoft Entra-program. Du behöver även katalog-ID(klient)-ID, program-ID och klienthemlighet.
Lägga till en Event Hubs-dataanslutning
I följande exempel visas hur du lägger till en Event Hubs-dataanslutning programmatiskt. Mer information om hur du lägger till en Event Hubs-dataanslutning med hjälp av Azure Portal finns i Ansluta till Händelsehubbar.
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Inställning | Föreslaget värde | Fältbeskrivning |
---|---|---|
tenantId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | Ditt klientorganisations-ID. Kallas även katalog-ID. |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | Det prenumerations-ID som du använder för att skapa resurser. |
clientId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | Klient-ID för programmet som kan komma åt resurser i din klientorganisation. |
clientSecret | xxxxxxxxxxxxxxxx | Klienthemligheten för programmet som kan komma åt resurser i din klientorganisation. |
resourceGroupName | testrg | Namnet på resursgruppen som innehåller klustret. |
clusterName | mykustocluster | Namnet på klustret. |
databaseName | mykustodatabase | Namnet på måldatabasen i klustret. |
dataConnectionName | myeventhubconnect | Önskat namn på dataanslutningen. |
tableName | StormEvents | Namnet på måltabellen i måldatabasen. |
mappingRuleName | StormEvents_CSV_Mapping | Namnet på din kolumnmappning som är relaterad till måltabellen. |
dataFormat | csv | Meddelandets dataformat. |
eventHubResourceId | Resurs-ID | Resurs-ID för din händelsehubb som innehåller data för inmatning. |
consumerGroup | $Default | Konsumentgruppen för din händelsehubb. |
plats | USA, centrala | Platsen för dataanslutningsresursen. |
komprimering | Gzip eller Ingen | Typen av datakomprimering. |
Generera data
Se exempelappen som genererar data och skickar dem till en händelsehubb.
En händelse kan innehålla en eller flera poster, upp till dess storleksgräns. I följande exempel skickar vi två händelser, där var och en har fem poster tillagt:
var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;
for (var i = 0; i < 10; i++)
{
// Create the data
var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
counter++;
// Create the event
if (counter == recordsPerEvent)
{
var eventData = new EventData(Encoding.UTF8.GetBytes(data));
events.Add(eventData);
counter = 0;
data = string.Empty;
}
}
// Send events
eventHubClient.SendAsync(events).Wait();
Rensa resurser
Om du vill ta bort dataanslutningen använder du följande kommando:
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);