Configurar a ingestão de dados da transmissão em fluxo no cluster do Azure Data Explorer
Artigo
A ingestão de transmissão em fluxo é útil para carregar dados quando precisa de baixa latência entre a ingestão e a consulta. Considere utilizar a ingestão de transmissão em fluxo nos seguintes cenários:
É necessária latência inferior a um segundo.
Para otimizar o processamento operacional de muitas tabelas em que o fluxo de dados para cada tabela é relativamente pequeno (alguns registos por segundo), mas o volume geral de ingestão de dados é elevado (milhares de registos por segundo).
Se o fluxo de dados para cada tabela for elevado (mais de 4 GB por hora), considere utilizar a ingestão em lote.
Escolher o tipo de ingestão de transmissão em fluxo adequado
São suportados dois tipos de ingestão de transmissão em fluxo:
Tipo de ingestão
Description
Ligação de dados
As ligações de dados do Hub de Eventos, do Hub IoT e do Event Grid podem utilizar a ingestão de transmissão em fluxo, desde que esteja ativada ao nível do cluster. A decisão de utilizar a ingestão de transmissão em fluxo é tomada de acordo com a política de ingestão de transmissão em fluxo configurada na tabela de destino. Para obter informações sobre a gestão de ligações de dados, veja Hub de Eventos, Hub IoT e Event Grid.
Ingestão personalizada
A ingestão personalizada requer que escreva uma aplicação que utilize uma das bibliotecas de cliente do Azure Data Explorer. Utilize as informações neste tópico para configurar a ingestão personalizada. Também pode considerar a aplicação de exemplo de ingestão de transmissão em fluxo C# útil.
Utilize a tabela seguinte para o ajudar a escolher o tipo de ingestão adequado para o seu ambiente:
Critério
Ligação de dados
Ingestão Personalizada
Atraso de dados entre a iniciação da ingestão e os dados disponíveis para consulta
Atraso mais longo
Atraso mais curto
Sobrecarga de desenvolvimento
Configuração rápida e fácil, sem sobrecarga de desenvolvimento
Sobrecarga de desenvolvimento elevada para criar uma aplicação que ingere os dados, processe erros e garanta a consistência dos dados
Nota
Pode gerir o processo para ativar e desativar a ingestão de transmissão em fluxo no cluster com o portal do Azure ou programaticamente em C#. Se estiver a utilizar C# para a sua aplicação personalizada, poderá considerar mais conveniente utilizar a abordagem programática.
Os principais contribuidores que podem afetar a ingestão de transmissão em fluxo são:
Tamanho da VM e do cluster: desempenho da ingestão de transmissão em fluxo e dimensionamentos de capacidade com tamanhos de VM e cluster aumentados. O número de pedidos de ingestão simultâneos está limitado a seis por núcleo. Por exemplo, para SKUs de 16 núcleos, como D14 e L16, a carga máxima suportada é de 96 pedidos de ingestão simultâneos. Para dois SKUs principais, como D11, a carga máxima suportada é de 12 pedidos de ingestão simultâneos.
Limite de tamanho dos dados: o limite de tamanho dos dados para um pedido de ingestão de transmissão em fluxo é de 4 MB. Isto inclui quaisquer dados criados para políticas de atualização durante a ingestão.
Atualizações de esquema: as atualizações de esquema, como a criação e modificação de tabelas e mapeamentos de ingestão, podem demorar até cinco minutos para o serviço de ingestão de transmissão em fluxo. Para obter mais informações, veja Ingestão de transmissão em fluxo e alterações de esquema.
Capacidade SSD: ativar a ingestão de transmissão em fluxo num cluster, mesmo quando os dados não são ingeridos através da transmissão em fluxo, utiliza parte do disco SSD local das máquinas de cluster para transmissão em fluxo de dados de ingestão e reduz o armazenamento disponível para cache frequente.
Ativar a ingestão de transmissão em fluxo no cluster
Na portal do Azure, aceda ao cluster do Azure Data Explorer.
Em Definições, selecione Configurações.
No painel Configurações , selecione Ativado para ativar a Ingestão de transmissão em fluxo.
Selecione Guardar.
Pode ativar a ingestão de transmissão em fluxo ao criar um novo cluster do Azure Data Explorer.
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: true);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Criar uma tabela de destino e definir a política
Crie uma tabela para receber os dados de ingestão de transmissão em fluxo e defina a política relacionada com a portal do Azure ou programaticamente em C#.
Para criar a tabela que irá receber os dados através da ingestão de transmissão em fluxo, copie o seguinte comando para o painel Consulta e selecioneExecutar.
Copie um dos seguintes comandos para o painel Consulta e selecioneExecutar. Isto define a política de ingestão de transmissão em fluxo na tabela que criou ou na base de dados que contém a tabela.
Dica
Uma política definida ao nível da base de dados aplica-se a todas as tabelas existentes e futuras na base de dados. Quando ativa a política ao nível da base de dados, não é necessário ativá-la por tabela.
Para definir a política na tabela que criou, utilize:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "<tableName>";
var tableSchema = new TableSchema(
tableName,
new ColumnSchema[]
{
new("TimeStamp", "System.DateTime"),
new("Name", "System.String"),
new("Metric", "System.int"),
new("Source", "System.String"),
});
var tableCreateCommand = CslCommandGenerator.GenerateTableCreateCommand(tableSchema);
var tablePolicyAlterCommand = CslCommandGenerator.GenerateTableAlterStreamingIngestionPolicyCommand(tableName, isEnabled: true);
await client.ExecuteControlCommandAsync(tableCreateCommand);
await client.ExecuteControlCommandAsync(tablePolicyAlterCommand);
}
}
Criar uma aplicação de ingestão de transmissão em fluxo para ingerir dados no cluster
Crie a sua aplicação para ingerir dados no cluster com o seu idioma preferido.
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go//azure/data-explorer/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Desativar a ingestão de transmissão em fluxo no cluster
Aviso
A desativação da ingestão de transmissão em fluxo pode demorar algumas horas.
Antes de desativar a ingestão de transmissão em fluxo no cluster do Azure Data Explorer, remova a política de ingestão de transmissão em fluxo de todas as tabelas e bases de dados relevantes. A remoção da política de ingestão de transmissão em fluxo aciona a reorganização de dados no cluster do Azure Data Explorer. Os dados de ingestão de transmissão em fluxo são movidos do armazenamento inicial para o armazenamento permanente no arquivo de colunas (extensões ou partições horizontais). Este processo pode demorar entre alguns segundos a algumas horas, dependendo da quantidade de dados no armazenamento inicial.
Remover a política de ingestão de transmissão em fluxo
Pode remover a política de ingestão de transmissão em fluxo com o portal do Azure ou programaticamente em C#.
No portal do Azure, aceda ao cluster do Azure Data Explorer e selecione Consulta.
Para remover a política de ingestão de transmissão em fluxo da tabela, copie o seguinte comando para o painel Consulta e selecioneExecutar.
.delete table TestTable policy streamingingestion
Em Definições, selecione Configurações.
No painel Configurações , selecione Desativar para desativar a ingestão de transmissão em fluxo.
Selecione Guardar.
Para remover a política de ingestão de transmissão em fluxo da tabela, execute o seguinte código:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Para desativar a ingestão de transmissão em fluxo no cluster, execute o seguinte código:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: false);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Limitações
Os mapeamentos de dados têm de ser pré-criados para utilização na ingestão de transmissão em fluxo. Os pedidos individuais de ingestão de transmissão em fluxo não acomodam mapeamentos de dados inline.
Atualizar política. A política de atualização só pode referenciar os dados recém-ingeridos na tabela de origem e não quaisquer outros dados ou tabelas na base de dados.
Se a ingestão de transmissão em fluxo estiver ativada num cluster utilizado como líder para bases de dados de seguidores, a ingestão de transmissão em fluxo tem de ser ativada nos seguintes clusters, bem como para seguir os dados de ingestão de transmissão em fluxo. O mesmo se aplica se os dados do cluster são partilhados através de Data Share.