Delen via


Een Event Hub-gegevensverbinding maken voor Azure Synapse Data Explorer met behulp van Python (preview)

Azure Synapse Data Explorer is een snelle en zeer schaalbare gegevensverkenningsservice voor logboek- en telemetriegegevens. Azure Synapse Data Explorer biedt opname (gegevens laden) van Event Hubs, IoT Hubs en blobs die zijn geschreven naar blobcontainers.

In dit artikel maakt u een Event Hub-gegevensverbinding voor Azure Synapse Data Explorer met behulp van Python.

Vereisten

  • Een Azure-abonnement. Maak een gratis Azure-account.

  • Een Data Explorer-pool maken met Synapse Studio of Azure Portal

  • Maak een Data Explorer-database.

    1. Selecteer Gegevens in Synapse Studio in het linkerdeelvenster.

    2. Selecteer + (Nieuwe resource toevoegen) >Data Explorer-pool en gebruik de volgende informatie:

      Instelling Voorgestelde waarde Beschrijving
      Poolnaam contosodataexplorer De naam van de Data Explorer-pool die moet worden gebruikt
      Naam TestDatabase De databasenaam moet uniek zijn binnen het cluster.
      Standaardretentieperiode 365 De periode (in dagen) dat de gegevens gegarandeerd beschikbaar blijven voor query's. De periode wordt gemeten vanaf het moment dat de gegevens zijn opgenomen.
      Standaardcacheperiode 31 De periode (in dagen) dat vaak opgevraagde gegevens beschikbaar blijven in de SSD-opslag of het RAM-geheugen in plaats van in de langetermijnopslag.
    3. Selecteer Maken om het profiel te maken. Het maakproces duurt meestal minder dan een minuut.

Een tabel maken in het testcluster

Maak een tabel met de naam StormEvents die overeenkomt met het schema van de gegevens in het bestand StormEvents.csv.

Tip

Met de volgende codefragmenten maakt u voor bijna elke aanroep een exemplaar van een client. Dit wordt gedaan om elk fragment afzonderlijk uit te voeren. In productie worden de clientexemplaren opnieuw in gebruik genomen en moeten ze zo lang worden bewaard als nodig is. Eén clientexemplaren per URI zijn voldoende, zelfs wanneer u met meerdere databases werkt (database kan op opdrachtniveau worden opgegeven).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Toewijzing van opname definiëren

Wijs de binnenkomende CSV-gegevens toe aan de kolomnamen die zijn gebruikt bij het maken van de tabel. Richt een CSV-kolomtoewijzingsobject in voor die tabel.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Python-pakket installeren

Als u het Python-pakket voor Azure Synapse Data Explorer wilt installeren, opent u een opdrachtprompt met Python in het pad. Voer de volgende opdracht uit:

pip install azure-common
pip install azure-mgmt-kusto

Verificatie

Als u het volgende voorbeeld wilt uitvoeren, hebt u een Microsoft Entra-toepassing en service-principal nodig die toegang heeft tot resources. Zie Een Microsoft Entra-toepassing maken om een gratis Microsoft Entra-toepassing te maken en roltoewijzing toe te voegen op abonnementsniveau. U hebt ook de map-id (tenant), de toepassings-id en het clientgeheim nodig.

Een Event Hub-gegevensverbinding toevoegen

In het volgende voorbeeld ziet u hoe u programmatisch een Event Hub-gegevensverbinding toevoegt. Zie verbinding maken met de Event Hub voor het toevoegen van een Event Hub-gegevensverbinding met behulp van Azure Portal.

from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials

#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
        client_id=client_id,
        secret=client_secret,
        tenant=tenant_id
    )
kusto_management_client = KustoManagementClient(credentials, subscription_id)

resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
                                        parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
                                                                            table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
Instelling Voorgestelde waarde Veldomschrijving
tenant_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxx-xxxxxxxxx Uw tenant-id. Ook wel map-id genoemd.
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxx-xxxxxxxxx De abonnements-id die u gebruikt voor het maken van resources.
client_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxx-xxxxxxxxx De client-id van de toepassing die toegang heeft tot resources in uw tenant.
client_secret xxxxxxxxxxxx Het clientgeheim van de toepassing die toegang heeft tot resources in uw tenant.
resource_group_name testrg De naam van de resourcegroep die uw cluster bevat.
cluster_name mykustocluster De naam van het cluster.
database_name mykustodatabase De naam van de doeldatabase in uw cluster.
data_connection_name myeventhubconnect De gewenste naam van uw gegevensverbinding.
table_name StormEvents De naam van de doeltabel in de doeldatabase.
mapping_rule_name StormEvents_CSV_Mapping De naam van de kolomtoewijzing die is gerelateerd aan de doeltabel.
data_format CSV De gegevensindeling van het bericht.
event_hub_resource_id Resource-id De resource-id van uw Event Hub die de gegevens bevat voor opname.
consumer_group $Default De consumentengroep van uw Event Hub.
locatie US - centraal De locatie van de gegevensverbindingsresource.

Resources opschonen

Gebruik de volgende opdracht om de gegevensverbinding te verwijderen:

kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)

Volgende stappen