Criar uma conexão de dados do Hub de Eventos com o Azure Synapse Data Explorer usando o Python (Versão prévia)
O Azure Synapse Data Explorer é um serviço de exploração de dados rápido e altamente escalonável para dados de log e telemetria. O Azure Synapse Data Explorer oferece ingestão (carregamento de dados) de Hubs de Eventos, Hubs IoT e blobs gravados nos contêineres de blob.
Neste artigo, você criará uma conexão de dados do Hub de Eventos para o Azure Synapse Data Explorer usando o Python.
Pré-requisitos
Uma assinatura do Azure. Criar uma conta gratuita do Azure.
Criar um pool do Data Explorer usando o Synapse Studio ou o portal do Azure
Criar um banco de dados do Data Explorer.
No Synapse Studio, no painel esquerdo, selecione Dados.
Selecione + (Adicionar novo recurso) >Pool do Data Explorer e use as seguintes informações:
Configuração Valor sugerido Descrição Nome do pool contosodataexplorer O nome do pool do Data Explorer a ser usado Nome TestDatabase O nome do banco de dados deve ser exclusivo dentro do cluster. Período de retenção padrão 365 O período de tempo (em dias) durante o qual há a garantia de que os dados serão mantidos disponíveis para consulta. O período é medido a partir do momento em que os dados são incluídos. Período de cache padrão 31 O período de tempo (em dias) durante o qual os dados consultados com frequência devem ser mantidos disponíveis no armazenamento SSD ou RAM, em vez de no armazenamento de longo prazo. Selecione Criar para criar o banco de dados. A criação geralmente leva menos de um minuto.
Criar uma tabela em seu cluster de teste
Crie uma tabela chamada StormEvents
que corresponde ao esquema dos dados no arquivo StormEvents.csv
.
Dica
Os snippets de código a seguir criam uma instância de um cliente para quase todas as chamadas. Isso é feito para tornar cada snippet de código individualmente executável. Em produção, as instâncias de cliente são reentrantes e devem ser mantidas desde que sejam necessárias. Uma só instância de cliente por URI é suficiente, mesmo quando você trabalha com vários bancos de dados (o banco de dados pode ser especificado em um nível de comando).
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Definir mapeamento de ingestão
Mapear os dados CSV de entrada para os nomes de colunas usados ao criar a tabela. Provisione um objeto de mapeamento de coluna CSV nessa tabela.
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Instalar o pacote do Python
Para instalar o pacote do Python do Azure Synapse Data Explorer, abra um prompt de comando que tenha Python no caminho. Execute o comando a seguir:
pip install azure-common
pip install azure-mgmt-kusto
Autenticação
Para executar o exemplo a seguir, você precisará de um aplicativo e uma entidade de serviço do Microsoft Entra que possam acessar recursos. Para criar um aplicativo gratuito do Microsoft Entra e adicionar a atribuição de função no nível da assinatura, consulte Como criar um aplicativo do Microsoft Entra. Você também precisará da ID do diretório (locatário), da ID do aplicativo e do segredo do cliente.
Adicionar uma conexão de dados do Hub de Eventos
O exemplo a seguir mostra como adicionar uma conexão de dados do Hub de Eventos programaticamente. Confira Conectar-se ao hub de eventos para adicionar uma conexão de dados do Hub de Eventos usando o portal do Azure.
from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials
#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=client_secret,
tenant=tenant_id
)
kusto_management_client = KustoManagementClient(credentials, subscription_id)
resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
Configuração | Valor sugerido | Descrição do campo |
---|---|---|
tenant_id | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | ID do locatário. Também conhecida como ID do diretório. |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | A ID da assinatura que você usa para a criação de recursos. |
client_id | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | A ID do cliente do aplicativo que pode acessar recursos em seu locatário. |
client_secret | xxxxxxxxxxxxxx | O segredo do cliente do aplicativo que pode acessar recursos em seu locatário. |
resource_group_name | testrg | O nome do grupo de recursos que contém o seu cluster. |
cluster_name | mykustocluster | O nome do seu cluster. |
database_name | mykustodatabase | O nome do banco de dados de destino no cluster. |
data_connection_name | myeventhubconnect | O nome desejado da conexão de dados. |
table_name | StormEvents | O nome da tabela de destino no banco de dados de destino. |
mapping_rule_name | StormEvents_CSV_Mapping | O nome do mapeamento de coluna relacionado à tabela de destino. |
data_format | csv | O formato de dados da mensagem. |
event_hub_resource_id | ID de Recurso | A ID do recurso do Hub de Eventos que contém os dados para ingestão. |
consumer_group | $Default | O grupo de consumidores do Hub de Eventos. |
local | Centro dos EUA | A localização do recurso de conexão de dados. |
Limpar recursos
Para excluir a conexão de dados, use o seguinte comando:
kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)