Ingerir dados com a biblioteca do Azure Data Explorer Node
O Azure Data Explorer é um serviço de exploração de dados rápido e altamente dimensionável para dados telemétricos e de registo. O Azure Data Explorer oferece duas bibliotecas de cliente para o Node: uma biblioteca de ingestão e uma biblioteca de dados. Estas bibliotecas permitem ingerir (carregar) dados para um cluster e consultar dados a partir do código. Neste artigo, vai criar primeiro uma tabela e um mapeamento de dados num cluster de teste. Em seguida, vai colocar em fila a ingestão para o cluster e validar os resultados.
Se não tiver uma subscrição do Azure, crie uma conta do Azure gratuita antes de começar.
Pré-requisitos
- Uma conta Microsoft ou uma identidade de utilizador Microsoft Entra. Não é necessária uma subscrição do Azure.
- Um cluster e uma base de dados do Azure Data Explorer. Criar um cluster e uma base de dados.
- Node.js instalado no seu computador de desenvolvimento
Instalar as bibliotecas de dados e de ingestão
Instalar o azure-kusto-ingest e o azure-kusto-data
npm i azure-kusto-ingest@^3.3.2 azure-kusto-data@^3.3.2
Adicionar declarações e constantes de importação
Importar classes das bibliotecas
const { Client: KustoClient, KustoConnectionStringBuilder } = require('azure-kusto-data');
const {
IngestClient: KustoIngestClient,
IngestionProperties,
IngestionDescriptors,
DataFormat,
IngestionMappingKind,
} = require("azure-kusto-ingest");
Para autenticar uma aplicação, o Azure Data Explorer utiliza o seu ID de inquilino Microsoft Entra. Para localizar o seu ID de inquilino, siga Localizar o seu ID de inquilino do Microsoft 365.
Defina os valores para authorityId
, kustoUri
, kustoIngestUri
e kustoDatabase
antes de executar este código.
const cluster = "MyCluster";
const region = "westus";
const authorityId = "microsoft.com";
const kustoUri = `https://${cluster}.${region}.kusto.windows.net`;
const kustoIngestUri = `https://ingest-${cluster}.${region}.kusto.windows.net`;
const kustoDatabase = "Weather";
Agora construa a cadeia de ligação. Este exemplo utiliza a autenticação do dispositivo para aceder ao cluster. Verifique o resultado da consola para concluir a autenticação. Também pode utilizar um Microsoft Entra certificado de aplicação, chave de aplicação e utilizador e palavra-passe.
Irá criar a tabela de destino e o mapeamento num passo posterior.
const kcsbIngest = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoIngestUri, authorityId);
const kcsbData = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoUri, authorityId);
const destTable = "StormEvents";
const destTableMapping = "StormEvents_CSV_Mapping";
Definir as informações do ficheiro de origem
Importe mais classes e defina constantes para o ficheiro de origem de dados. Este exemplo utiliza um ficheiro de exemplo alojado no Armazenamento de Blobs do Azure. O conjunto de dados de exemplo StormEvents contém dados relacionados com as condições meteorológicas dos Centros Nacionais de Informação Ambiental.
const container = "samplefiles";
const account = "kustosamples";
const sas = ""; // If relevant add SAS token
const filePath = "StormEvents.csv";
const blobPath = `https://${account}.blob.core.windows.net/${container}/${filePath}${sas}`;
Criar uma tabela no cluster de teste
Crie uma tabela que corresponda ao esquema dos dados no ficheiro StormEvents.csv
. Quando este código é executado, devolve uma mensagem semelhante à seguinte: Para iniciar sessão, utilize um browser para abrir a página https://microsoft.com/devicelogin e introduza o código XXXXXXXXX para autenticar. Siga os passos para iniciar sessão e regresse para executar o próximo bloco de código. Os blocos de código subsequentes que estabelecerem uma ligação terão de iniciar sessão novamente.
const kustoClient = new KustoClient(kcsbData);
const createTableCommand = `.create table ${destTable} (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)`;
const createTableResults = await kustoClient.executeMgmt(kustoDatabase, createTableCommand);
console.log(createTableResults.primaryResults[0].toJSON().data);
Definir o mapeamento de ingestão
Mapeie os dados recebidos do CSV para os nomes de coluna e tipos de dados utilizados ao criar a tabela.
const createMappingCommand = `.create table ${destTable} ingestion csv mapping '${destTableMapping}' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'`;
const mappingCommandResults = await kustoClient.executeMgmt(kustoDatabase, createMappingCommand);
console.log(mappingCommandResults.primaryResults[0].toJSON().data);
Colocar uma mensagem em fila para ingestão
Coloque uma mensagem em fila para extrair dados do armazenamento de blobs e ingerir esses dados para o Azure Data Explorer.
const defaultProps = new IngestionProperties({
database: kustoDatabase,
table: destTable,
format: DataFormat.CSV,
ingestionMappingReference: destTableMapping,
ingestionMappingKind: IngestionMappingKind.CSV,
additionalProperties: {ignoreFirstRecord: true},
});
const ingestClient = new KustoIngestClient(kcsbIngest, defaultProps);
// All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
const blobDesc = new BlobDescriptor(blobPath, 10);
try {
const ingestionResult = await ingestClient.ingestFromBlob(blobDesc, null);
} catch (err) {
// Handle errors
}
Valide se a tabela contém dados
Valide se os dados foram ingeridos para a tabela. Aguarde cinco a dez minutos para que a ingestão colocada em fila agende a ingestão e carregue os dados para o Azure Data Explorer. Em seguida, execute o seguinte código para obter a contagem de registos na tabela StormEvents
.
const query = `${destTable} | count`;
var tableResults = await kustoClient.execute(kustoDatabase, query);
console.log(tableResults.primaryResults[0].toJSON().data);
Executar consultas de resolução de problemas
Inicie sessão no https://dataexplorer.azure.com e ligue ao cluster. Execute o seguinte comando na base de dados para ver se ocorreram quaisquer falhas de ingestão nas últimas quatro horas. Substitua o nome da base de dados antes de executar.
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
Execute o seguinte comando para ver o estado de todas as operações de ingestão nas últimas quatro horas. Substitua o nome da base de dados antes de executar.
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
Limpar os recursos
Se planeia seguir os nossos outros artigos, mantenha os recursos que criou. Caso contrário, execute o seguinte comando na base de dados para limpar a tabela StormEvents
.
.drop table StormEvents