Udostępnij za pośrednictwem


Tworzenie połączenia danych centrum zdarzeń dla usługi Azure Synapse Data Explorer przy użyciu języka Python (wersja zapoznawcza)

Usługa Azure Synapse Data Explorer to szybka i wysoce skalowalna usługa eksploracji danych na potrzeby danych dzienników i danych telemetrycznych. Usługa Azure Synapse Data Explorer oferuje pozyskiwanie danych (ładowanie danych) z usług Event Hubs, IoT Hubs i obiektów blob zapisanych w kontenerach obiektów blob.

W tym artykule utworzysz połączenie danych centrum zdarzeń dla usługi Azure Synapse Data Explorer przy użyciu języka Python.

Wymagania wstępne

  • Subskrypcja platformy Azure. Utwórz bezpłatne konto platformy Azure.

  • Tworzenie puli eksploratora danych przy użyciu programu Synapse Studio lub witryny Azure Portal

  • Utwórz bazę danych eksploratora danych.

    1. W programie Synapse Studio w okienku po lewej stronie wybierz pozycję Dane.

    2. Wybierz + pozycję (Dodaj nowy zasób) >pulę Eksploratora danych i użyj następujących informacji:

      Ustawienie Sugerowana wartość opis
      Nazwa puli contosodataexplorer Nazwa puli Eksploratora danych do użycia
      Nazwisko TestDatabase Nazwa bazy danych musi być unikatowa w obrębie klastra.
      Domyślny okres przechowywania 365 Przedział czasu (w dniach), w którym gwarantowana jest dostępność danych dla zapytania. Przedział czasu jest mierzony od momentu pozyskania danych.
      Domyślny okres pamięci podręcznej 31 Przedział czasu (w dniach), w którym często używane w zapytaniach dane mają być dostępne na dysku SSD lub w pamięci RAM zamiast w magazynie długoterminowym.
    3. Wybierz pozycję Utwórz, aby utworzyć bazę danych. Tworzenie zazwyczaj zajmuje mniej niż minutę.

Tworzenie tabeli w klastrze testowym

Utwórz tabelę o nazwie StormEvents, która będzie zgodna ze schematem danych w pliku StormEvents.csv.

Napiwek

Poniższe fragmenty kodu tworzą wystąpienie klienta dla niemal każdego wywołania. Należy to zrobić, aby każdy fragment kodu był uruchamiany indywidualnie. W środowisku produkcyjnym wystąpienia klienta są ponowne i powinny być przechowywane tak długo, jak to konieczne. Pojedyncze wystąpienie klienta na identyfikator URI jest wystarczające, nawet podczas pracy z wieloma bazami danych (bazę danych można określić na poziomie polecenia).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Definiowanie mapowania pozyskiwania

Zamapuj przychodzące dane CSV na nazwy kolumn używane podczas tworzenia tabeli. Aprowizuj obiekt mapowania kolumn CSV w tej tabeli.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Instalowanie pakietu języka Python

Aby zainstalować pakiet języka Python dla usługi Azure Synapse Data Explorer, otwórz wiersz polecenia z językiem Python w swojej ścieżce. Uruchom następujące polecenie:

pip install azure-common
pip install azure-mgmt-kusto

Uwierzytelnianie

Aby uruchomić poniższy przykład, potrzebujesz aplikacji Firmy Microsoft Entra i jednostki usługi, która może uzyskiwać dostęp do zasobów. Aby utworzyć bezpłatną aplikację Microsoft Entra i dodać przypisanie roli na poziomie subskrypcji, zobacz Tworzenie aplikacji Firmy Microsoft Entra. Potrzebny jest również identyfikator katalogu (dzierżawy), identyfikator aplikacji i klucz tajny klienta.

Dodawanie połączenia danych centrum zdarzeń

W poniższym przykładzie pokazano, jak programowo dodać połączenie danych centrum zdarzeń. Zobacz Nawiązywanie połączenia z centrum zdarzeń w celu dodania połączenia danych centrum zdarzeń przy użyciu witryny Azure Portal.

from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials

#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
        client_id=client_id,
        secret=client_secret,
        tenant=tenant_id
    )
kusto_management_client = KustoManagementClient(credentials, subscription_id)

resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
                                        parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
                                                                            table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
Ustawienie Sugerowana wartość Opis pola
tenant_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Identyfikator dzierżawy. Znany również jako identyfikator katalogu.
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Identyfikator subskrypcji używany do tworzenia zasobów.
client_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Identyfikator klienta aplikacji, która może uzyskiwać dostęp do zasobów w dzierżawie.
client_secret xxxxxxxxxxxxxxxx Klucz tajny klienta aplikacji, który może uzyskiwać dostęp do zasobów w dzierżawie.
resource_group_name testrg Nazwa grupy zasobów zawierającej klaster.
cluster_name mykustocluster Nazwa klastra.
database_name mykustodatabase Nazwa docelowej bazy danych w klastrze.
data_connection_name myeventhubconnect Żądana nazwa połączenia danych.
table_name StormEvents Nazwa tabeli docelowej w docelowej bazie danych.
mapping_rule_name StormEvents_CSV_Mapping Nazwa mapowania kolumny powiązana z tabelą docelową.
data_format csv Format danych wiadomości.
event_hub_resource_id Identyfikator zasobu Identyfikator zasobu centrum zdarzeń, który przechowuje dane do pozyskiwania.
consumer_group $Default Grupa odbiorców centrum zdarzeń.
lokalizacja Środkowe stany USA Lokalizacja zasobu połączenia danych.

Czyszczenie zasobów

Aby usunąć połączenie danych, użyj następującego polecenia:

kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)

Następne kroki