Dela via


Skapa en app för att hämta data med hjälp av köad inmatning

Gäller för: ✅Microsoft FabricAzure Data Explorer

Kusto kan hantera massdataintag genom att optimera och batchinmata inmatade data via batchhanteringshanteraren. Batchhanteraren aggregerar inmatade data innan de når måltabellen, vilket möjliggör effektivare bearbetning och bättre prestanda. Batchbearbetning görs vanligtvis i partier om 1 GB rådata, 1 000 enskilda filer eller en standardtidsgräns på 5 minuter. Batchbearbetningsprinciper kan uppdateras på databas- och tabellnivå, vanligtvis för att sänka batchtiden och minska svarstiden. För mer information om batchinmatning, se IngestionBatching policy och Ändra tabellnivåns batchinmatningspolicy programmatiskt.

Anteckning

Batchbearbetning tar också hänsyn till olika faktorer, till exempel måldatabasen och tabellen, användaren som kör inmatningen och olika egenskaper som är associerade med inmatningen, till exempel särskilda taggar.

I den här artikeln lär du dig att:

Förutsättningar

Innan du börjar

  • Använd någon av följande metoder för att skapa MyStormEvents-tabellen och, eftersom endast en liten mängd data matas in, anger du tidsgränsen för inmatningsbatchprincipen till 10 sekunder:

    1. Skapa en måltabell med namnet MyStormEvents i databasen genom att köra den första appen i hanteringskommandon.
    2. Ange tidsgränsen för principen för batchning av inmatning till 10 sekunder genom att köra den andra appen med hanteringskommandon . Innan du kör appen ändrar du timeout-värdet till 00:00:10.

    Anteckning

    Det kan ta några minuter innan de nya principinställningarna för batchbearbetning sprids till batchhanteraren.

  • Ladda ned stormevent.csv exempeldatafil. Filen innehåller 1 000 stormhändelser.

Not

I följande exempel förutsätts en trivial matchning mellan kolumnerna i inmatade data och schemat för måltabellen. Om inmatade data inte matchar tabellschemat så måste du använda en inmatningsmappning för att justera datakolumnerna med tabellschemat.

Köa en fil för inmatning och fråga efter resultaten

I önskad IDE eller textredigerare skapar du ett projekt eller en fil med namnet grundläggande inmatning med den konvention som är lämplig för det språk du föredrar. Placera stormevent.csv-filen på samma plats som din app.

Obs

I följande exempel använder du två klienter, en för att fråga klustret och den andra för att mata in data i klustret. För språk där klientbiblioteket har stöd för detta delar båda klienterna samma autentisering med användaruppmaning, vilket resulterar i en enda användaruppmaning i stället för en för varje klient.

Lägg till följande kod:

  1. Skapa en klientapp som ansluter till klustret och skriver ut antalet rader i MyStormEvents tabell. Du använder det här antalet som baslinje för jämförelse med antalet rader efter varje metod för inmatning. Ersätt platshållarna <your_cluster_uri> och <your_database> med klustrets URI respektive databasnamn.

    using Kusto.Data;
    using Kusto.Data.Net.Client;
    
    namespace BatchIngest {
      class BatchIngest {
        static void Main(string[] args) {
          string clusterUri = "<your_cluster_uri>";
          var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
            .WithAadUserPromptAuthentication();
    
          using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) {
            string database = "<your_database>";
            string table = "MyStormEvents";
    
            string query = table + " | count";
            using (var response = kustoClient.ExecuteQuery(database, query, null)) {
              Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
              PrintResultsAsValueList(response);
            }
          }
        }
    
        static void PrintResultsAsValueList(IDataReader response) {
          string value;
          while (response.Read()) {
            for (int i = 0; i < response.FieldCount; i++) {
              value = "";
              if (response.GetDataTypeName(i) == "Int32")
                  value = response.GetInt32(i).ToString();
              else if (response.GetDataTypeName(i) == "Int64")
                value = response.GetInt64(i).ToString();
              else if (response.GetDataTypeName(i) == "DateTime")
                value = response.GetDateTime(i).ToString();
              else if (response.GetDataTypeName(i) == "Object")
                value = response.GetValue(i).ToString() ?? "{}";
              else
                value = response.GetString(i);
    
              Console.WriteLine("\t{0} - {1}", response.GetName(i), value ?? "None");
          }
        }
      }
    }
    
  2. Skapa en anslutningssträngsbyggare som definierar URI:n för datainmatning och använder samma autentiseringsuppgifter som kluster-URI:n där det är möjligt. Ersätt platshållaren <your_ingestion_uri> med URI för datainmatning.

    using Kusto.Data.Common;
    using Kusto.Ingest;
    using System.Data;
    
    string ingestUri = "<your_ingestion_uri>";
    var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
      .WithAadUserPromptAuthentication();
    
  3. Mata in stormevent.csv-filen genom att lägga till den i batchkön. Du använder följande objekt och egenskaper:

    • QueuedIngestClient för att skapa inmatningsklienten.
    • IngestionProperties för att ange inmatningsegenskaperna.
    • DataFormat för att ange filformatet som CSV-.
    • ignore_first_record för att ange om den första raden i CSV och liknande filtyper ignoreras med hjälp av följande logik:
      • True: Den första raden ignoreras. Använd det här alternativet om du vill släppa rubrikraden från tabelltextdata.
      • False: Den första raden matas in som en vanlig rad.

    Obs

    Inmatning stöder en maximal filstorlek på 6 GB. Rekommendationen är att mata in filer mellan 100 MB och 1 GB.

    using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
      string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
    
      Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
      var ingestProps = new KustoIngestionProperties(database, table) {
        Format = DataSourceFormat.csv,
        AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
      };
      _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;
    }
    
  4. Fråga efter antalet rader i tabellen efter att filen har matats in och visa den sista raden som har matats in.

    Obs

    Om du vill tillåta att inmatningen slutförs väntar du 30 sekunder innan du kör frågor mot tabellen. För C# väntar du 60 sekunder för att tillåta tid för att lägga till filen i inmatningskön asynkront.

    Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
    Thread.Sleep(TimeSpan.FromSeconds(60));
    
    using (var response = kustoClient.ExecuteQuery(database, query, null)) {
      Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
      PrintResultsAsValueList(response);
    }
    
    query = table + " | top 1 by ingestion_time()";
    using (var response = kustoClient.ExecuteQuery(database, query, null)) {
      Console.WriteLine("\nLast ingested row:");
      PrintResultsAsValueList(response);
    }
    

Den fullständiga koden bör se ut så här:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      string clusterUri = "<your_cluster_uri>";
      var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
        .WithAadUserPromptAuthentication();
      string ingestUri = "<your_ingestion_uri>";
      var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
        .WithAadUserPromptAuthentication();


      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";
        string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");

        string query = table + " | count";
        using (var response = kustoClient.ExecuteQuery(database, query, null)) {
          Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
          PrintResultsAsValueList(response);
        }

        Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
        var ingestProps = new KustoIngestionProperties(database, table) {
          Format = DataSourceFormat.csv,
          AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
        };
        _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;

        Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
        Thread.Sleep(TimeSpan.FromSeconds(60));

        using (var response = kustoClient.ExecuteQuery(database, query, null)) {
          Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
          PrintResultsAsValueList(response);
        }

        query = table + " | top 1 by ingestion_time()";
        using (var response = kustoClient.ExecuteQuery(database, query, null))
        {
          Console.WriteLine("\nLast ingested row:");
          PrintResultsAsValueList(response);
        }
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      while (response.Read()) {
        for (int i = 0; i < response.FieldCount; i++) {
          if (response.GetDataTypeName(i) == "Int64")
            Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetInt64(i));
          else
            Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetString(i));
        }
      }
    }
  }
}

Kör din app

I ett kommandogränssnitt använder du följande kommando för att köra appen:

# Change directory to the folder that contains the management commands project
dotnet run .

Du bör se ett resultat som liknar följande:

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 0

Ingesting data from file: 
        C:\MyApp\stormevents.csv

Waiting 30 seconds for ingestion to complete

Number of rows in MyStormEvents AFTER ingesting the file:
         Count - 1000

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Lagra minnesintern data för inmatning och sök efter resultaten

Du kan mata in data från minnet genom att skapa en ström som innehåller data och sedan köa den för inmatning.

Du kan till exempel ändra appen genom att ersätta inmatning från fil-kod på följande sätt:

  1. Lägg till stream descriptor-paketet i importerna överst i filen.

    Inga ytterligare paket krävs.

  2. Lägg till en minnesintern sträng med data som ska matas in.

    string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
    var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
    
  3. Ställ in importeringsegenskaperna så att den första posten inte ignoreras eftersom strängen i minnet inte har någon rubrikrad.

    ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
    
  4. Mata in internminnedata genom att lägga till den i batch-kön. Ange om möjligt storleken på rådata.

    _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;
    

En disposition av den uppdaterade koden bör se ut så här:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      ...
      string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
      var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));

      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";

        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
        _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;

        ...
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      ...
    }
  }
}

När du kör appen bör du se ett resultat som liknar följande. Observera att efter inmatningen ökade antalet rader i tabellen med en.

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1000

Ingesting data from memory:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from memory:
         Count - 1001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Schemalägg en blob för inmatning och hämta resultaten

Du kan mata in data från Azure Storage-blobar, Azure Data Lake-filer och Amazon S3-filer.

Till exempel kan du modifiera appen genom att ersätta inmatningen från minneskoden med följande:

  1. Börja med att ladda upp stormevent.csv-filen till ditt lagringskonto och generera en URI med läsbehörighet, till exempel med hjälp av en SAS-token för Azure-blobbar.

  2. Lägg till blobdeskriptorpaketet till importerna överst i filen.

    Inga ytterligare paket krävs.

  3. Skapa en blobbeskrivning med hjälp av blob-URI:n, ange inmatningsegenskaperna och mata sedan in data från bloben. Ersätt platshållaren <your_blob_uri> med blob-URI:n.

    string blobUri = "<your_blob_uri>";
    
    ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
    _= ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;
    

En disposition av den uppdaterade koden bör se ut så här:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      ...
      string blobUri = "<your_blob_uri>";


      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";

        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
        _=_ ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;

        ...
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      ...
    }
  }
}

När du kör appen bör du se ett resultat som liknar följande. Observera att efter inmatningen ökade antalet rader i tabellen med 1 000.

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1001

Ingesting data from a blob:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from a blob:
         Count - 2001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Nästa steg