Partilhar via


Criar uma conexão de dados do Hub de Eventos para o Azure Synapse Data Explorer usando Python (Visualização)

O Azure Synapse Data Explorer é um serviço de exploração de dados rápido e altamente escalável para dados de log e telemetria. O Azure Synapse Data Explorer oferece ingestão (carregamento de dados) de Hubs de Eventos, Hubs IoT e blobs gravados em contêineres de blob.

Neste artigo, você cria uma conexão de dados do Hub de Eventos para o Azure Synapse Data Explorer usando Python.

Pré-requisitos

  • Uma subscrição do Azure. Crie uma conta do Azure gratuita.

  • Criar um pool do Data Explorer usando o Synapse Studio ou o portal do Azure

  • Crie um banco de dados do Data Explorer.

    1. No Synapse Studio, no painel do lado esquerdo, selecione Dados.

    2. Selecione + (Adicionar novo recurso) >Pool do Data Explorer e use as seguintes informações:

      Definição Valor sugerido Description
      Nome do conjunto contosodataexplorer O nome do pool do Data Explorer a ser usado
      Nome TestDatabase O nome da base de dados tem de ser exclusivo dentro do cluster.
      Período de retenção predefinido 365 O período de tempo (em dias) durante o qual é garantido que os dados são mantidos disponíveis para consulta. O intervalo de tempo é medido desde o momento em que os dados são ingeridos.
      Período de cache padrão 31 O período de tempo (em dias) durante o qual manter os dados frequentemente consultados disponíveis no armazenamento SSD ou RAM, em vez de no armazenamento a longo prazo.
    3. Selecione Criar para criar o banco de dados. Normalmente, a criação demora menos de um minuto.

Criar uma tabela no cluster de teste

Crie uma tabela nomeada StormEvents que corresponda ao esquema dos dados no StormEvents.csv arquivo.

Gorjeta

Os trechos de código a seguir criam uma instância de um cliente para quase todas as chamadas. Isso é feito para tornar cada trecho executável individualmente. Na produção, as instâncias do cliente são reentrantes e devem ser mantidas pelo tempo necessário. Uma única instância de cliente por URI é suficiente, mesmo quando se trabalha com vários bancos de dados (o banco de dados pode ser especificado em um nível de comando).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Definir o mapeamento de ingestão

Mapeie os dados CSV de entrada para os nomes de coluna usados ao criar a tabela. Provisione um objeto de mapeamento de coluna CSV nessa tabela.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Instalar pacote Python

Para instalar o pacote Python para o Azure Synapse Data Explorer, abra um prompt de comando que tenha Python em seu caminho. Execute o seguinte comando:

pip install azure-common
pip install azure-mgmt-kusto

Autenticação

Para executar o exemplo a seguir, você precisa de um aplicativo Microsoft Entra e uma entidade de serviço que possa acessar recursos. Para criar um aplicativo Microsoft Entra gratuito e adicionar atribuição de função no nível da assinatura, consulte Criar um aplicativo Microsoft Entra. Você também precisa do ID do diretório (locatário), ID do aplicativo e segredo do cliente.

Adicionar uma conexão de dados do Hub de Eventos

O exemplo a seguir mostra como adicionar uma conexão de dados do Hub de Eventos programaticamente. Consulte Conectar-se ao hub de eventos para adicionar uma conexão de dados do Hub de Eventos usando o portal do Azure.

from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials

#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
        client_id=client_id,
        secret=client_secret,
        tenant=tenant_id
    )
kusto_management_client = KustoManagementClient(credentials, subscription_id)

resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
                                        parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
                                                                            table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
Definição Valor sugerido Descrição do campo
tenant_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx O ID do inquilino. Também conhecido como ID de diretório.
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx A ID de assinatura que você usa para a criação de recursos.
client_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx A ID do cliente do aplicativo que pode acessar recursos em seu locatário.
client_secret xxxxxxxxxxxxxx O segredo do cliente do aplicativo que pode acessar recursos em seu locatário.
resource_group_name Testrg O nome do grupo de recursos que contém o cluster.
cluster_name Mykustocluster O nome do cluster.
database_name mykustodatabase O nome do banco de dados de destino no cluster.
data_connection_name MyEventHubConnect O nome desejado da sua conexão de dados.
table_name StormEvents O nome da tabela de destino no banco de dados de destino.
mapping_rule_name StormEvents_CSV_Mapping O nome do mapeamento de coluna relacionado à tabela de destino.
data_format CSV O formato de dados da mensagem.
event_hub_resource_id ID do recurso O ID do recurso do Hub de Eventos que contém os dados para ingestão.
consumer_group $Default O grupo de consumidores do seu Hub de Eventos.
localização E.U.A. Central O local do recurso de conexão de dados.

Clean up resources (Limpar recursos)

Para excluir a conexão de dados, use o seguinte comando:

kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)

Próximos passos