Dela via


Skapa en Händelsehubb-dataanslutning för Azure Synapse Data Explorer med hjälp av Python (förhandsversion)

Azure Synapse Data Explorer är en snabb och mycket skalbar datautforskningstjänst för logg- och telemetridata. Azure Synapse Data Explorer erbjuder inmatning (datainläsning) från Event Hubs, IoT Hubs och blobar som skrivits till blobcontainrar.

I den här artikeln skapar du en Händelsehubb-dataanslutning för Azure Synapse Data Explorer med hjälp av Python.

Förutsättningar

  • En Azure-prenumeration. Skapa ett kostnadsfritt Azure-konto.

  • Skapa en Data Explorer-pool med Synapse Studio eller Azure Portal

  • Skapa en Data Explorer-databas.

    1. Välj Data i fönstret till vänster i Synapse Studio.

    2. Välj + (Lägg till ny resurs) >Data Explorer-pool och använd följande information:

      Inställning Föreslaget värde beskrivning
      Poolnamn contosodataexplorer Namnet på datautforskarens pool som ska användas
      Name TestDatabase Databasnamnet måste vara unikt inom klustret.
      Standardkvarhållningsperiod 365 Det tidsintervall (i dagar) då det är garanterat att data förblir tillgängliga för frågor. Tidsintervallet mäts från det att data matas in.
      Standardcacheperiod 31 Det tidsintervall (i dagar) då data som frågor körs mot ofta ska vara tillgängliga i SSD-lagring eller RAM i stället för i långsiktig lagring.
    3. Välj Skapa för att skapa databasen. Det brukar ta mindre än en minut att skapa en databas.

Skapa en tabell i ditt testkluster

Skapa en tabell med namnet StormEvents som matchar schemat för data i filen StormEvents.csv.

Dricks

Följande kodfragment skapar en instans av en klient för nästan varje anrop. Detta görs för att göra varje kodfragment individuellt runnable. I produktion är klientinstanserna reentrant och bör behållas så länge som det behövs. Det räcker med en enskild klientinstans per URI, även när du arbetar med flera databaser (databasen kan anges på kommandonivå).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Definiera mappning av inmatning

Mappa inkommande CSV-data till de kolumnnamn som används när du skapade tabellen. Etablera ett CSV-kolumnmappningsobjekt i tabellen.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Installera Python-paket

Om du vill installera Python-paketet för Azure Synapse Data Explorer öppnar du en kommandotolk som har Python i sin sökväg. Kör följande kommando:

pip install azure-common
pip install azure-mgmt-kusto

Autentisering

Om du vill köra följande exempel behöver du ett Microsoft Entra-program och tjänstens huvudnamn som kan komma åt resurser. Information om hur du skapar ett kostnadsfritt Microsoft Entra-program och lägger till rolltilldelning på prenumerationsnivå finns i Skapa ett Microsoft Entra-program. Du behöver även katalog-ID(klient)-ID, program-ID och klienthemlighet.

Lägga till en Händelsehubb-dataanslutning

I följande exempel visas hur du lägger till en Händelsehubb-dataanslutning programmatiskt. Se Ansluta till händelsehubben för att lägga till en Händelsehubb-dataanslutning med hjälp av Azure Portal.

from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials

#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
        client_id=client_id,
        secret=client_secret,
        tenant=tenant_id
    )
kusto_management_client = KustoManagementClient(credentials, subscription_id)

resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
                                        parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
                                                                            table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
Inställning Föreslaget värde Fältbeskrivning
tenant_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Ditt klientorganisations-ID. Kallas även katalog-ID.
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Det prenumerations-ID som du använder för att skapa resurser.
client_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Klient-ID för programmet som kan komma åt resurser i din klientorganisation.
client_secret xxxxxxxxxxxxxxxx Klienthemligheten för programmet som kan komma åt resurser i din klientorganisation.
resource_group_name testrg Namnet på resursgruppen som innehåller klustret.
cluster_name mykustocluster Namnet på klustret.
database_name mykustodatabase Namnet på måldatabasen i klustret.
data_connection_name myeventhubconnect Önskat namn på dataanslutningen.
table_name StormEvents Namnet på måltabellen i måldatabasen.
mapping_rule_name StormEvents_CSV_Mapping Namnet på din kolumnmappning som är relaterad till måltabellen.
data_format csv Meddelandets dataformat.
event_hub_resource_id Resurs-ID Resurs-ID för din händelsehubb som innehåller data för inmatning.
consumer_group $Default Konsumentgruppen för din händelsehubb.
plats USA, centrala Platsen för dataanslutningsresursen.

Rensa resurser

Om du vill ta bort dataanslutningen använder du följande kommando:

kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)

Nästa steg