Gegevens opnemen met behulp van de Node-bibliotheek voor Azure Data Explorer
Azure Data Explorer is een snelle en zeer schaalbare service voor gegevensverkenning voor telemetrische gegevens en gegevens uit logboeken. Azure Data Explorer biedt twee clientbibliotheken voor Node: een ingest library en een data library (beide Engelstalig). Met deze bibliotheken kunt u gegevens opnemen (laden) in een cluster en gegevens bevragen vanuit uw code. In dit artikel maakt u eerst een tabel en gegevenstoewijzing in een testcluster. Vervolgens plaatst u op te nemen gegevens in de wachtrij en valideert u de resultaten.
Als u nog geen abonnement op Azure hebt, maak dan een gratis Azure-account aan voordat u begint.
Vereisten
- Een Microsoft-account of een Microsoft Entra gebruikersidentiteit. Er is geen Azure-abonnement vereist.
- Een Azure Data Explorer-cluster en -database. Maak een cluster en database.
- Node.js geïnstalleerd op uw ontwikkelcomputer
De bibliotheken data en ingest installeren
Installeer azure-kusto-ingest en azure-kusto-data
npm i azure-kusto-ingest@^3.3.2 azure-kusto-data@^3.3.2
Importinstructies en constanten toevoegen
Importeer klassen uit de bibliotheken
const { Client: KustoClient, KustoConnectionStringBuilder } = require('azure-kusto-data');
const {
IngestClient: KustoIngestClient,
IngestionProperties,
IngestionDescriptors,
DataFormat,
IngestionMappingKind,
} = require("azure-kusto-ingest");
Azure Data Explorer gebruikt uw Microsoft Entra tenant-id om een toepassing te verifiëren. Als u uw tenant-id wilt vinden, volgt u Uw Microsoft 365-tenant-id zoeken.
Stel de waarden voor authorityId
, kustoUri
, kustoIngestUri
en kustoDatabase
in voordat u deze code uitvoert.
const cluster = "MyCluster";
const region = "westus";
const authorityId = "microsoft.com";
const kustoUri = `https://${cluster}.${region}.kusto.windows.net`;
const kustoIngestUri = `https://ingest-${cluster}.${region}.kusto.windows.net`;
const kustoDatabase = "Weather";
Nu maakt u de verbindingsreeks. In dit voorbeeld wordt apparaatverificatie gebruikt voor toegang tot het cluster. Controleer de console-uitvoer om de verificatie te voltooien. U kunt ook een Microsoft Entra toepassingscertificaat, toepassingssleutel en gebruiker en wachtwoord gebruiken.
U maakt de doeltabel en toewijzing in een latere stap.
const kcsbIngest = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoIngestUri, authorityId);
const kcsbData = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoUri, authorityId);
const destTable = "StormEvents";
const destTableMapping = "StormEvents_CSV_Mapping";
Gegevens van bronbestand instellen
Importeer meer klassen en stel constanten in voor het gegevensbronbestand. In dit voorbeeld wordt een voorbeeldbestand gebruikt dat wordt gehost in Azure Blob Storage. De StormEvents-voorbeeldgegevensset bevat weergerelateerde gegevens van de National Centers for Environmental Information.
const container = "samplefiles";
const account = "kustosamples";
const sas = ""; // If relevant add SAS token
const filePath = "StormEvents.csv";
const blobPath = `https://${account}.blob.core.windows.net/${container}/${filePath}${sas}`;
Een tabel maken in het testcluster
Maak een tabel die overeenkomt met het schema van de gegevens in het bestand StormEvents.csv
. Wanneer deze code wordt uitgevoerd, wordt er een bericht als het volgende geretourneerd: Als u zich wilt aanmelden, opent u de pagina https://microsoft.com/devicelogin met een webbrowser. Voer de code XXXXXXXXX in om te verifiëren. Volg de stappen om u aan te melden en ga vervolgens terug om het volgende codeblok uit te voeren. Als volgende codeblokken verbinding moeten maken, moet u zich opnieuw aanmelden.
const kustoClient = new KustoClient(kcsbData);
const createTableCommand = `.create table ${destTable} (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)`;
const createTableResults = await kustoClient.executeMgmt(kustoDatabase, createTableCommand);
console.log(createTableResults.primaryResults[0].toJSON().data);
Toewijzing van opname definiëren
Wijs binnenkomende CSV-gegevens toe aan de kolomnamen en gegevenstypen die zijn gebruikt bij het maken van de tabel.
const createMappingCommand = `.create table ${destTable} ingestion csv mapping '${destTableMapping}' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'`;
const mappingCommandResults = await kustoClient.executeMgmt(kustoDatabase, createMappingCommand);
console.log(mappingCommandResults.primaryResults[0].toJSON().data);
Bericht in wachtrij zetten voor opname
Zet een bericht in de wachtrij om gegevens op te halen uit blob-opslag en die gegevens op te nemen in Azure Data Explorer.
const defaultProps = new IngestionProperties({
database: kustoDatabase,
table: destTable,
format: DataFormat.CSV,
ingestionMappingReference: destTableMapping,
ingestionMappingKind: IngestionMappingKind.CSV,
additionalProperties: {ignoreFirstRecord: true},
});
const ingestClient = new KustoIngestClient(kcsbIngest, defaultProps);
// All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
const blobDesc = new BlobDescriptor(blobPath, 10);
try {
const ingestionResult = await ingestClient.ingestFromBlob(blobDesc, null);
} catch (err) {
// Handle errors
}
Valideren dat de tabel gegevens bevat
Valideer dat er gegevens in de tabel zijn opgenomen. Wacht vijf tot tien minuten totdat de opname in de wachtrijde opname heeft gepland en de gegevens in Azure Data Explorer zijn geladen. Voer vervolgens de volgende code uit om het aantal records in de tabel StormEvents
te bepalen.
const query = `${destTable} | count`;
var tableResults = await kustoClient.execute(kustoDatabase, query);
console.log(tableResults.primaryResults[0].toJSON().data);
Query's voor probleemoplossing uitvoeren
Meld u aan bij https://dataexplorer.azure.com en maak verbinding met uw cluster. Voer de volgende opdracht uit in uw database om te zien of er in de afgelopen vier uur fouten zijn opgetreden tijdens het opnemen van gegevens. Vervang de naam van de database voordat u de opdracht uitvoert.
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
Voer de volgende opdracht uit om de status op te vragen van alle bewerkingen voor het opnemen van gegevens van de afgelopen vier uur. Vervang de naam van de database voordat u de opdracht uitvoert.
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
Resources opschonen
Als u van plan bent onze andere artikelen te volgen, behoudt u de resources die u hebt gemaakt. Voer anders de volgende opdracht uit in uw database om de tabel StormEvents
op te schonen.
.drop table StormEvents