Een Event Hub-gegevensverbinding maken voor Azure Synapse Data Explorer met behulp van Python (preview)
Azure Synapse Data Explorer is een snelle en zeer schaalbare gegevensverkenningsservice voor logboek- en telemetriegegevens. Azure Synapse Data Explorer biedt opname (gegevens laden) van Event Hubs, IoT Hubs en blobs die zijn geschreven naar blobcontainers.
In dit artikel maakt u een Event Hub-gegevensverbinding voor Azure Synapse Data Explorer met behulp van Python.
Vereisten
Een Azure-abonnement. Maak een gratis Azure-account.
Een Data Explorer-pool maken met Synapse Studio of Azure Portal
Maak een Data Explorer-database.
Selecteer Gegevens in Synapse Studio in het linkerdeelvenster.
Selecteer + (Nieuwe resource toevoegen) >Data Explorer-pool en gebruik de volgende informatie:
Instelling Voorgestelde waarde Beschrijving Poolnaam contosodataexplorer De naam van de Data Explorer-pool die moet worden gebruikt Naam TestDatabase De databasenaam moet uniek zijn binnen het cluster. Standaardretentieperiode 365 De periode (in dagen) dat de gegevens gegarandeerd beschikbaar blijven voor query's. De periode wordt gemeten vanaf het moment dat de gegevens zijn opgenomen. Standaardcacheperiode 31 De periode (in dagen) dat vaak opgevraagde gegevens beschikbaar blijven in de SSD-opslag of het RAM-geheugen in plaats van in de langetermijnopslag. Selecteer Maken om het profiel te maken. Het maakproces duurt meestal minder dan een minuut.
Een tabel maken in het testcluster
Maak een tabel met de naam StormEvents
die overeenkomt met het schema van de gegevens in het bestand StormEvents.csv
.
Tip
Met de volgende codefragmenten maakt u voor bijna elke aanroep een exemplaar van een client. Dit wordt gedaan om elk fragment afzonderlijk uit te voeren. In productie worden de clientexemplaren opnieuw in gebruik genomen en moeten ze zo lang worden bewaard als nodig is. Eén clientexemplaren per URI zijn voldoende, zelfs wanneer u met meerdere databases werkt (database kan op opdrachtniveau worden opgegeven).
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Toewijzing van opname definiëren
Wijs de binnenkomende CSV-gegevens toe aan de kolomnamen die zijn gebruikt bij het maken van de tabel. Richt een CSV-kolomtoewijzingsobject in voor die tabel.
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Python-pakket installeren
Als u het Python-pakket voor Azure Synapse Data Explorer wilt installeren, opent u een opdrachtprompt met Python in het pad. Voer de volgende opdracht uit:
pip install azure-common
pip install azure-mgmt-kusto
Verificatie
Als u het volgende voorbeeld wilt uitvoeren, hebt u een Microsoft Entra-toepassing en service-principal nodig die toegang heeft tot resources. Zie Een Microsoft Entra-toepassing maken om een gratis Microsoft Entra-toepassing te maken en roltoewijzing toe te voegen op abonnementsniveau. U hebt ook de map-id (tenant), de toepassings-id en het clientgeheim nodig.
Een Event Hub-gegevensverbinding toevoegen
In het volgende voorbeeld ziet u hoe u programmatisch een Event Hub-gegevensverbinding toevoegt. Zie verbinding maken met de Event Hub voor het toevoegen van een Event Hub-gegevensverbinding met behulp van Azure Portal.
from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials
#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=client_secret,
tenant=tenant_id
)
kusto_management_client = KustoManagementClient(credentials, subscription_id)
resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
Instelling | Voorgestelde waarde | Veldomschrijving |
---|---|---|
tenant_id | xxxxxxxx-xxxxx-xxxx-xxxx-xxxx-xxxxxxxxx | Uw tenant-id. Ook wel map-id genoemd. |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxx-xxxxxxxxx | De abonnements-id die u gebruikt voor het maken van resources. |
client_id | xxxxxxxx-xxxxx-xxxx-xxxx-xxxx-xxxxxxxxx | De client-id van de toepassing die toegang heeft tot resources in uw tenant. |
client_secret | xxxxxxxxxxxx | Het clientgeheim van de toepassing die toegang heeft tot resources in uw tenant. |
resource_group_name | testrg | De naam van de resourcegroep die uw cluster bevat. |
cluster_name | mykustocluster | De naam van het cluster. |
database_name | mykustodatabase | De naam van de doeldatabase in uw cluster. |
data_connection_name | myeventhubconnect | De gewenste naam van uw gegevensverbinding. |
table_name | StormEvents | De naam van de doeltabel in de doeldatabase. |
mapping_rule_name | StormEvents_CSV_Mapping | De naam van de kolomtoewijzing die is gerelateerd aan de doeltabel. |
data_format | CSV | De gegevensindeling van het bericht. |
event_hub_resource_id | Resource-id | De resource-id van uw Event Hub die de gegevens bevat voor opname. |
consumer_group | $Default | De consumentengroep van uw Event Hub. |
locatie | US - centraal | De locatie van de gegevensverbindingsresource. |
Resources opschonen
Gebruik de volgende opdracht om de gegevensverbinding te verwijderen:
kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)