Skapa en Händelsehubb-dataanslutning för Azure Synapse Data Explorer med hjälp av Python (förhandsversion)
Azure Synapse Data Explorer är en snabb och mycket skalbar datautforskningstjänst för logg- och telemetridata. Azure Synapse Data Explorer erbjuder inmatning (datainläsning) från Event Hubs, IoT Hubs och blobar som skrivits till blobcontainrar.
I den här artikeln skapar du en Händelsehubb-dataanslutning för Azure Synapse Data Explorer med hjälp av Python.
Förutsättningar
En Azure-prenumeration. Skapa ett kostnadsfritt Azure-konto.
Skapa en Data Explorer-pool med Synapse Studio eller Azure Portal
Skapa en Data Explorer-databas.
Välj Data i fönstret till vänster i Synapse Studio.
Välj + (Lägg till ny resurs) >Data Explorer-pool och använd följande information:
Inställning Föreslaget värde beskrivning Poolnamn contosodataexplorer Namnet på datautforskarens pool som ska användas Name TestDatabase Databasnamnet måste vara unikt inom klustret. Standardkvarhållningsperiod 365 Det tidsintervall (i dagar) då det är garanterat att data förblir tillgängliga för frågor. Tidsintervallet mäts från det att data matas in. Standardcacheperiod 31 Det tidsintervall (i dagar) då data som frågor körs mot ofta ska vara tillgängliga i SSD-lagring eller RAM i stället för i långsiktig lagring. Välj Skapa för att skapa databasen. Det brukar ta mindre än en minut att skapa en databas.
Skapa en tabell i ditt testkluster
Skapa en tabell med namnet StormEvents
som matchar schemat för data i filen StormEvents.csv
.
Dricks
Följande kodfragment skapar en instans av en klient för nästan varje anrop. Detta görs för att göra varje kodfragment individuellt runnable. I produktion är klientinstanserna reentrant och bör behållas så länge som det behövs. Det räcker med en enskild klientinstans per URI, även när du arbetar med flera databaser (databasen kan anges på kommandonivå).
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Definiera mappning av inmatning
Mappa inkommande CSV-data till de kolumnnamn som används när du skapade tabellen. Etablera ett CSV-kolumnmappningsobjekt i tabellen.
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Installera Python-paket
Om du vill installera Python-paketet för Azure Synapse Data Explorer öppnar du en kommandotolk som har Python i sin sökväg. Kör följande kommando:
pip install azure-common
pip install azure-mgmt-kusto
Autentisering
Om du vill köra följande exempel behöver du ett Microsoft Entra-program och tjänstens huvudnamn som kan komma åt resurser. Information om hur du skapar ett kostnadsfritt Microsoft Entra-program och lägger till rolltilldelning på prenumerationsnivå finns i Skapa ett Microsoft Entra-program. Du behöver även katalog-ID(klient)-ID, program-ID och klienthemlighet.
Lägga till en Händelsehubb-dataanslutning
I följande exempel visas hur du lägger till en Händelsehubb-dataanslutning programmatiskt. Se Ansluta till händelsehubben för att lägga till en Händelsehubb-dataanslutning med hjälp av Azure Portal.
from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials
#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=client_secret,
tenant=tenant_id
)
kusto_management_client = KustoManagementClient(credentials, subscription_id)
resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
Inställning | Föreslaget värde | Fältbeskrivning |
---|---|---|
tenant_id | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | Ditt klientorganisations-ID. Kallas även katalog-ID. |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | Det prenumerations-ID som du använder för att skapa resurser. |
client_id | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | Klient-ID för programmet som kan komma åt resurser i din klientorganisation. |
client_secret | xxxxxxxxxxxxxxxx | Klienthemligheten för programmet som kan komma åt resurser i din klientorganisation. |
resource_group_name | testrg | Namnet på resursgruppen som innehåller klustret. |
cluster_name | mykustocluster | Namnet på klustret. |
database_name | mykustodatabase | Namnet på måldatabasen i klustret. |
data_connection_name | myeventhubconnect | Önskat namn på dataanslutningen. |
table_name | StormEvents | Namnet på måltabellen i måldatabasen. |
mapping_rule_name | StormEvents_CSV_Mapping | Namnet på din kolumnmappning som är relaterad till måltabellen. |
data_format | csv | Meddelandets dataformat. |
event_hub_resource_id | Resurs-ID | Resurs-ID för din händelsehubb som innehåller data för inmatning. |
consumer_group | $Default | Konsumentgruppen för din händelsehubb. |
plats | USA, centrala | Platsen för dataanslutningsresursen. |
Rensa resurser
Om du vill ta bort dataanslutningen använder du följande kommando:
kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)