Partilhar via


Criar uma conexão de dados de Hubs de Eventos para o Azure Synapse Data Explorer usando C# (Visualização)

O Azure Synapse Data Explorer é um serviço de exploração de dados rápido e altamente escalável para dados de log e telemetria. O Azure Synapse Data Explorer oferece ingestão (carregamento de dados) de Hubs de Eventos, Hubs IoT e blobs gravados em contêineres de blob.

Neste artigo, você cria uma conexão de dados de Hubs de Eventos para o Azure Synapse Data Explorer usando C#.

Pré-requisitos

  • Uma subscrição do Azure. Crie uma conta do Azure gratuita.

  • Criar um pool do Data Explorer usando o Synapse Studio ou o portal do Azure

  • Crie um banco de dados do Data Explorer.

    1. No Synapse Studio, no painel do lado esquerdo, selecione Dados.

    2. Selecione + (Adicionar novo recurso) >Pool do Data Explorer e use as seguintes informações:

      Definição Valor sugerido Description
      Nome do conjunto contosodataexplorer O nome do pool do Data Explorer a ser usado
      Nome TestDatabase O nome da base de dados tem de ser exclusivo dentro do cluster.
      Período de retenção predefinido 365 O período de tempo (em dias) durante o qual é garantido que os dados são mantidos disponíveis para consulta. O intervalo de tempo é medido desde o momento em que os dados são ingeridos.
      Período de cache padrão 31 O período de tempo (em dias) durante o qual manter os dados frequentemente consultados disponíveis no armazenamento SSD ou RAM, em vez de no armazenamento a longo prazo.
    3. Selecione Criar para criar o banco de dados. Normalmente, a criação demora menos de um minuto.

Nota

A ingestão de dados de um Hub de Eventos em pools do Data Explorer não funcionará se o espaço de trabalho Synapse usar uma rede virtual gerenciada com a proteção contra exfiltração de dados habilitada.

  • Visual Studio 2019, baixe e use o Visual Studio 2019 Community Edition gratuito. Habilite o desenvolvimento do Azure durante a instalação do Visual Studio.

Criar uma tabela no cluster de teste

Crie uma tabela nomeada StormEvents que corresponda ao esquema dos dados no StormEvents.csv arquivo.

Gorjeta

Os trechos de código a seguir criam uma instância de um cliente para quase todas as chamadas. Isso é feito para tornar cada trecho executável individualmente. Na produção, as instâncias do cliente são reentrantes e devem ser mantidas pelo tempo necessário. Uma única instância de cliente por URI é suficiente, mesmo quando se trabalha com vários bancos de dados (o banco de dados pode ser especificado em um nível de comando).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Definir o mapeamento de ingestão

Mapeie os dados CSV de entrada para os nomes de coluna usados ao criar a tabela. Provisione um objeto de mapeamento de coluna CSV nessa tabela.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Instalar o C# NuGet

Autenticação

Para executar o exemplo a seguir, você precisa de um aplicativo Microsoft Entra e uma entidade de serviço que possa acessar recursos. Para criar um aplicativo Microsoft Entra gratuito e adicionar atribuição de função no nível da assinatura, consulte Criar um aplicativo Microsoft Entra. Você também precisa do ID do diretório (locatário), ID do aplicativo e segredo do cliente.

Adicionar uma conexão de dados de Hubs de Eventos

O exemplo a seguir mostra como adicionar uma conexão de dados de Hubs de Eventos programaticamente. Consulte Conectar-se aos Hubs de Eventos para obter informações sobre como adicionar uma conexão de dados de Hubs de Eventos usando o portal do Azure.

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);

var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);

var kustoManagementClient = new KustoManagementClient(credentials)
{
    SubscriptionId = subscriptionId
};

var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
    new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Definição Valor sugerido Descrição do campo
tenantId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx O ID do inquilino. Também conhecido como ID de diretório.
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx A ID de assinatura que você usa para a criação de recursos.
clientId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx A ID do cliente do aplicativo que pode acessar recursos em seu locatário.
clientSecret xxxxxxxxxxxxxx O segredo do cliente do aplicativo que pode acessar recursos em seu locatário.
resourceGroupName Testrg O nome do grupo de recursos que contém o cluster.
clusterName Mykustocluster O nome do cluster.
databaseName mykustodatabase O nome do banco de dados de destino no cluster.
dataConnectionName MyEventHubConnect O nome desejado da sua conexão de dados.
tableName StormEvents O nome da tabela de destino no banco de dados de destino.
mappingRuleName StormEvents_CSV_Mapping O nome do mapeamento de coluna relacionado à tabela de destino.
dataFormat CSV O formato de dados da mensagem.
eventHubResourceId ID do recurso O ID do recurso do hub de eventos que contém os dados para ingestão.
Grupo de consumidores $Default O grupo de consumidores do seu hub de eventos.
localização E.U.A. Central O local do recurso de conexão de dados.
compressão Gzip ou Nenhum O tipo de compressão de dados.

Gerar dados

Veja o aplicativo de exemplo que gera dados e os envia para um hub de eventos.

Um evento pode conter um ou mais registros, até seu limite de tamanho. No exemplo a seguir, enviamos dois eventos, cada um com cinco registros anexados:

var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;

for (var i = 0; i < 10; i++)
{
    // Create the data
    var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
    var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
    counter++;

    // Create the event
    if (counter == recordsPerEvent)
    {
        var eventData = new EventData(Encoding.UTF8.GetBytes(data));
        events.Add(eventData);

        counter = 0;
        data = string.Empty;
    }
}

// Send events
eventHubClient.SendAsync(events).Wait();

Clean up resources (Limpar recursos)

Para excluir a conexão de dados, use o seguinte comando:

kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);

Próximos passos