Vytvoření datového připojení centra událostí pro Azure Synapse Data Explorer pomocí Pythonu (Preview)
Azure Synapse Data Explorer je rychlá a vysoce škálovatelná služba pro zkoumání dat protokolů a telemetrie. Azure Synapse Data Explorer nabízí příjem dat (načítání dat) ze služby Event Hubs, IoT Hubs a objektů blob zapsaných do kontejnerů objektů blob.
V tomto článku vytvoříte datové připojení centra událostí pro Průzkumníka dat Azure Synapse pomocí Pythonu.
Požadavky
Předplatné Azure. Vytvořte bezplatný účet Azure.
Vytvoření fondu Průzkumníka dat pomocí nástroje Synapse Studio nebo webu Azure Portal
Vytvořte databázi Průzkumníka dat.
V nástroji Synapse Studio v levém podokně vyberte Data.
Vyberte + fond Průzkumníka dat (přidat nový zdroj) >a použijte následující informace:
Nastavení Navrhovaná hodnota Popis Název fondu contosodataexplorer Název fondu Průzkumníka dat, který se má použít Název TestDatabase Název databáze musí být v rámci clusteru jedinečný. Výchozí doba uchovávání 365 Časové období (ve dnech), pro které je zaručeno, že jsou data k dispozici pro dotazování. Časový rozsah se začíná měřit od okamžiku, kdy jsou data ingestována. Výchozí období mezipaměti 31 Časové období (ve dnech), pro které se mají uchovávat často dotazovaná data dostupná v úložišti SSD nebo paměti RAM, a ne v dlouhodobějším úložišti. Výběrem možnosti Vytvořit vytvořte databázi. Vytvoření obvykle trvá méně než minutu.
Vytvoření tabulky v testovacím clusteru
Vytvořte tabulku s názvem StormEvents
, která odpovídá schématu StormEvents.csv
dat v souboru.
Tip
Následující fragmenty kódu vytvoří instanci klienta pro téměř každé volání. To se provádí, aby každý fragment kódu byl jednotlivě spustitelný. V produkčním prostředí se instance klienta znovu zadají a měly by se uchovávat tak dlouho, jak je to potřeba. Jedna instance klienta na identifikátor URI stačí, i když pracujete s více databázemi (databázi je možné zadat na úrovni příkazu).
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Definování mapování ingestace
Namapujte příchozí data CSV na názvy sloupců použitých při vytváření tabulky. Zřiďte objekt mapování sloupců CSV v této tabulce.
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Instalace balíčku Pythonu
Pokud chcete nainstalovat balíček Pythonu pro Průzkumník dat Azure Synapse, otevřete příkazový řádek s Pythonem v jeho cestě. Spusťte následující příkaz:
pip install azure-common
pip install azure-mgmt-kusto
Ověřování
Ke spuštění následujícího příkladu potřebujete aplikaci Microsoft Entra a instanční objekt, který má přístup k prostředkům. Pokud chcete vytvořit bezplatnou aplikaci Microsoft Entra a přidat přiřazení role na úrovni předplatného, přečtěte si téma Vytvoření aplikace Microsoft Entra. Potřebujete také ID adresáře (tenanta), ID aplikace a tajný klíč klienta.
Přidání datového připojení centra událostí
Následující příklad ukazuje, jak přidat datové připojení centra událostí programově. Viz připojení k centru událostí pro přidání datového připojení centra událostí pomocí webu Azure Portal.
from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials
#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=client_secret,
tenant=tenant_id
)
kusto_management_client = KustoManagementClient(credentials, subscription_id)
resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
Nastavení | Navrhovaná hodnota | Popis pole |
---|---|---|
tenant_id | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | ID vašeho tenanta Označuje se také jako ID adresáře. |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | ID předplatného, které používáte k vytvoření prostředku. |
client_id | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | ID klienta aplikace, která má přístup k prostředkům ve vašem tenantovi. |
tajný klíč klienta | xxxxxxxxxxxxxxxx | Tajný klíč klienta aplikace, která má přístup k prostředkům ve vašem tenantovi. |
resource_group_name | testrg | Název skupiny prostředků obsahující váš cluster. |
cluster_name | mykustocluster | Název clusteru. |
database_name | mykustodatabase | Název cílové databáze v clusteru |
data_connection_name | myeventhubconnect | Požadovaný název datového připojení. |
table_name | StormEvents | Název cílové tabulky v cílové databázi. |
mapping_rule_name | StormEvents_CSV_Mapping | Název mapování sloupců související s cílovou tabulkou. |
data_format | csv | Formát dat zprávy. |
event_hub_resource_id | ID prostředku | ID prostředku vašeho centra událostí, které obsahuje data pro příjem dat. |
consumer_group | $Default | Skupina příjemců vašeho centra událostí. |
location | USA – střed | Umístění prostředku datového připojení |
Vyčištění prostředků
Pokud chcete datové připojení odstranit, použijte následující příkaz:
kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)