Tworzenie połączenia danych centrum zdarzeń dla usługi Azure Synapse Data Explorer przy użyciu języka Python (wersja zapoznawcza)
Usługa Azure Synapse Data Explorer to szybka i wysoce skalowalna usługa eksploracji danych na potrzeby danych dzienników i danych telemetrycznych. Usługa Azure Synapse Data Explorer oferuje pozyskiwanie danych (ładowanie danych) z usług Event Hubs, IoT Hubs i obiektów blob zapisanych w kontenerach obiektów blob.
W tym artykule utworzysz połączenie danych centrum zdarzeń dla usługi Azure Synapse Data Explorer przy użyciu języka Python.
Wymagania wstępne
Subskrypcja platformy Azure. Utwórz bezpłatne konto platformy Azure.
Tworzenie puli eksploratora danych przy użyciu programu Synapse Studio lub witryny Azure Portal
Utwórz bazę danych eksploratora danych.
W programie Synapse Studio w okienku po lewej stronie wybierz pozycję Dane.
Wybierz + pozycję (Dodaj nowy zasób) >pulę Eksploratora danych i użyj następujących informacji:
Ustawienie Sugerowana wartość opis Nazwa puli contosodataexplorer Nazwa puli Eksploratora danych do użycia Nazwisko TestDatabase Nazwa bazy danych musi być unikatowa w obrębie klastra. Domyślny okres przechowywania 365 Przedział czasu (w dniach), w którym gwarantowana jest dostępność danych dla zapytania. Przedział czasu jest mierzony od momentu pozyskania danych. Domyślny okres pamięci podręcznej 31 Przedział czasu (w dniach), w którym często używane w zapytaniach dane mają być dostępne na dysku SSD lub w pamięci RAM zamiast w magazynie długoterminowym. Wybierz pozycję Utwórz, aby utworzyć bazę danych. Tworzenie zazwyczaj zajmuje mniej niż minutę.
Tworzenie tabeli w klastrze testowym
Utwórz tabelę o nazwie StormEvents
, która będzie zgodna ze schematem danych w pliku StormEvents.csv
.
Napiwek
Poniższe fragmenty kodu tworzą wystąpienie klienta dla niemal każdego wywołania. Należy to zrobić, aby każdy fragment kodu był uruchamiany indywidualnie. W środowisku produkcyjnym wystąpienia klienta są ponowne i powinny być przechowywane tak długo, jak to konieczne. Pojedyncze wystąpienie klienta na identyfikator URI jest wystarczające, nawet podczas pracy z wieloma bazami danych (bazę danych można określić na poziomie polecenia).
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Definiowanie mapowania pozyskiwania
Zamapuj przychodzące dane CSV na nazwy kolumn używane podczas tworzenia tabeli. Aprowizuj obiekt mapowania kolumn CSV w tej tabeli.
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Instalowanie pakietu języka Python
Aby zainstalować pakiet języka Python dla usługi Azure Synapse Data Explorer, otwórz wiersz polecenia z językiem Python w swojej ścieżce. Uruchom następujące polecenie:
pip install azure-common
pip install azure-mgmt-kusto
Uwierzytelnianie
Aby uruchomić poniższy przykład, potrzebujesz aplikacji Firmy Microsoft Entra i jednostki usługi, która może uzyskiwać dostęp do zasobów. Aby utworzyć bezpłatną aplikację Microsoft Entra i dodać przypisanie roli na poziomie subskrypcji, zobacz Tworzenie aplikacji Firmy Microsoft Entra. Potrzebny jest również identyfikator katalogu (dzierżawy), identyfikator aplikacji i klucz tajny klienta.
Dodawanie połączenia danych centrum zdarzeń
W poniższym przykładzie pokazano, jak programowo dodać połączenie danych centrum zdarzeń. Zobacz Nawiązywanie połączenia z centrum zdarzeń w celu dodania połączenia danych centrum zdarzeń przy użyciu witryny Azure Portal.
from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials
#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=client_secret,
tenant=tenant_id
)
kusto_management_client = KustoManagementClient(credentials, subscription_id)
resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
Ustawienie | Sugerowana wartość | Opis pola |
---|---|---|
tenant_id | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | Identyfikator dzierżawy. Znany również jako identyfikator katalogu. |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | Identyfikator subskrypcji używany do tworzenia zasobów. |
client_id | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | Identyfikator klienta aplikacji, która może uzyskiwać dostęp do zasobów w dzierżawie. |
client_secret | xxxxxxxxxxxxxxxx | Klucz tajny klienta aplikacji, który może uzyskiwać dostęp do zasobów w dzierżawie. |
resource_group_name | testrg | Nazwa grupy zasobów zawierającej klaster. |
cluster_name | mykustocluster | Nazwa klastra. |
database_name | mykustodatabase | Nazwa docelowej bazy danych w klastrze. |
data_connection_name | myeventhubconnect | Żądana nazwa połączenia danych. |
table_name | StormEvents | Nazwa tabeli docelowej w docelowej bazie danych. |
mapping_rule_name | StormEvents_CSV_Mapping | Nazwa mapowania kolumny powiązana z tabelą docelową. |
data_format | csv | Format danych wiadomości. |
event_hub_resource_id | Identyfikator zasobu | Identyfikator zasobu centrum zdarzeń, który przechowuje dane do pozyskiwania. |
consumer_group | $Default | Grupa odbiorców centrum zdarzeń. |
lokalizacja | Środkowe stany USA | Lokalizacja zasobu połączenia danych. |
Czyszczenie zasobów
Aby usunąć połączenie danych, użyj następującego polecenia:
kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)