Sdílet prostřednictvím


Vytvořte aplikaci pro získání dat pomocí frontovaného příjmu

Platí pro: ✅Microsoft FabricAzure Data Explorer

Kusto dokáže zpracovat hromadný příjem dat optimalizací a dávkováním přijatých dat prostřednictvím svého správce dávkování. Manažer dávkování agreguje přijatá data předtím, než dosáhnou cílové tabulky, což umožňuje efektivnější zpracování a zvýšený výkon. Dávkování se obvykle provádí ve skupinách po 1 GB nezpracovaných dat, po 1 000 jednotlivých souborech, nebo je ve výchozím nastavení časový limit 5 minut. Zásady dávkování je možné aktualizovat na úrovni databáze a tabulky, což obvykle snižuje dobu dávkování a snižuje latenci. Další informace o dávkování příjmu najdete v tématu Zásady dávkového příjmu a Změnit zásady dávkového příjmu na úrovni tabulky programově.

Poznámka

Dávkování také bere v úvahu různé faktory, jako je cílová databáze a tabulka, uživatel, který spouští příjem dat, a různé vlastnosti spojené s příjmem dat, jako jsou speciální značky.

V tomto článku se naučíte:

Požadavky

Než začnete

  • Pomocí jedné z následujících metod vytvořte MyStormEvents tabulku a vzhledem k tomu, že se ingestuje jenom malé množství dat, nastavte časový limit zásad dávkování příjmu na 10 sekund:

    1. Vytvořte cílovou tabulku s názvem MyStormEvents ve vaší databázi spuštěním první aplikace v příkazů pro správu.
    2. Nastavte časový limit zásady dávkování ingesce na 10 sekund tak, že spustíte druhou aplikaci v rámci příkazů pro správu. Před spuštěním aplikace změňte hodnotu časového limitu na 00:00:10.

    Poznámka

    Rozšíření nového nastavení zásad dávkování do správce dávek může trvat několik minut.

  • Stáhněte si ukázkový datový soubor stormevent.csv. Soubor obsahuje 1 000 záznamů bouřkových událostí.

Poznámka

Následující příklady předpokládají triviální shodu mezi sloupci přijatých dat a schématem cílové tabulky. Pokud ingestované data triviálně neodpovídají schématu tabulky, musíte k zarovnání sloupců dat se schématem tabulky použít mapování příjmu dat.

Zařadíte soubor do fronty pro příjem dat a odešlete dotaz na výsledky.

Ve vámi preferovaném integrovaném vývojovém prostředí (IDE) nebo textovém editoru vytvořte projekt nebo soubor s názvem základní příjem dat pomocí konvence vhodné pro váš preferovaný jazyk. Umístěte stormevent.csv soubor do stejného umístění jako vaše aplikace.

Poznámka

V následujících příkladech používáte dva klienty, jeden na dotazování vašeho clusteru a druhý na vložení dat do vašeho clusteru. Pro jazyky, ve kterých ji klientská knihovna podporuje, sdílejí oba klienti stejný ověřovací program výzvy uživatele, což vede k zobrazení výzvy jednoho uživatele místo jednoho pro každého klienta.

Přidejte následující kód:

  1. Vytvořte klientskou aplikaci, která se připojí ke clusteru, a vytiskne počet řádků v tabulce MyStormEvents. Tento počet použijete jako základní hodnotu pro porovnání s počtem řádků po každé metodě ingestu. Nahraďte zástupné symboly <your_cluster_uri> a <your_database> identifikátorem URI clusteru a názvem databáze.

    using Kusto.Data;
    using Kusto.Data.Net.Client;
    
    namespace BatchIngest {
      class BatchIngest {
        static void Main(string[] args) {
          string clusterUri = "<your_cluster_uri>";
          var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
            .WithAadUserPromptAuthentication();
    
          using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) {
            string database = "<your_database>";
            string table = "MyStormEvents";
    
            string query = table + " | count";
            using (var response = kustoClient.ExecuteQuery(database, query, null)) {
              Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
              PrintResultsAsValueList(response);
            }
          }
        }
    
        static void PrintResultsAsValueList(IDataReader response) {
          string value;
          while (response.Read()) {
            for (int i = 0; i < response.FieldCount; i++) {
              value = "";
              if (response.GetDataTypeName(i) == "Int32")
                  value = response.GetInt32(i).ToString();
              else if (response.GetDataTypeName(i) == "Int64")
                value = response.GetInt64(i).ToString();
              else if (response.GetDataTypeName(i) == "DateTime")
                value = response.GetDateTime(i).ToString();
              else if (response.GetDataTypeName(i) == "Object")
                value = response.GetValue(i).ToString() ?? "{}";
              else
                value = response.GetString(i);
    
              Console.WriteLine("\t{0} - {1}", response.GetName(i), value ?? "None");
          }
        }
      }
    }
    
  2. Vytvořte objekt tvůrce připojovacích řetězců, který definuje identifikátor URI příjmu dat, pokud je to možné, pomocí sdílení stejných přihlašovacích údajů pro ověření jako identifikátor URI clusteru. Nahraďte zástupný symbol <your_ingestion_uri> adresou URI pro příjem dat.

    using Kusto.Data.Common;
    using Kusto.Ingest;
    using System.Data;
    
    string ingestUri = "<your_ingestion_uri>";
    var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
      .WithAadUserPromptAuthentication();
    
  3. Přidejte soubor stormevent.csv do dávkové fronty pro ingestování. Použijete následující objekty a vlastnosti:

    • QueuedIngestClient pro vytvoření klienta ingestu.
    • pro příjem dat nastavit vlastnosti příjmu dat.
    • DataFormat určit formát souboru jako CSV.
    • ignore_first_record určit, jestli je první řádek ve formátu CSV a dalších podobných typů souborů ignorován pomocí následující logiky:
      • true: První řádek se ignoruje. Tato možnost slouží k přetažení řádku záhlaví z tabulkových textových dat.
      • False: První řádek je zahrnut jako běžný řádek.

    Poznámka

    Příjem dat podporuje maximální velikost souboru 6 GB. Doporučujeme ingestovat soubory mezi 100 MB a 1 GB.

    using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
      string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
    
      Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
      var ingestProps = new KustoIngestionProperties(database, table) {
        Format = DataSourceFormat.csv,
        AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
      };
      _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;
    }
    
  4. Po ingestování souboru zadejte dotaz na počet řádků v tabulce a zobrazte poslední přijatý řádek.

    Poznámka

    Pokud chcete umožnit dokončení příjmu dat, počkejte 30 sekund před dotazováním tabulky. Počkejte 60 sekund v jazyce C#, abyste umožnili asynchronní přidání souboru do fronty příjmu dat.

    Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
    Thread.Sleep(TimeSpan.FromSeconds(60));
    
    using (var response = kustoClient.ExecuteQuery(database, query, null)) {
      Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
      PrintResultsAsValueList(response);
    }
    
    query = table + " | top 1 by ingestion_time()";
    using (var response = kustoClient.ExecuteQuery(database, query, null)) {
      Console.WriteLine("\nLast ingested row:");
      PrintResultsAsValueList(response);
    }
    

Celý kód by měl vypadat takto:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      string clusterUri = "<your_cluster_uri>";
      var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
        .WithAadUserPromptAuthentication();
      string ingestUri = "<your_ingestion_uri>";
      var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
        .WithAadUserPromptAuthentication();


      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";
        string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");

        string query = table + " | count";
        using (var response = kustoClient.ExecuteQuery(database, query, null)) {
          Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
          PrintResultsAsValueList(response);
        }

        Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
        var ingestProps = new KustoIngestionProperties(database, table) {
          Format = DataSourceFormat.csv,
          AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
        };
        _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;

        Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
        Thread.Sleep(TimeSpan.FromSeconds(60));

        using (var response = kustoClient.ExecuteQuery(database, query, null)) {
          Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
          PrintResultsAsValueList(response);
        }

        query = table + " | top 1 by ingestion_time()";
        using (var response = kustoClient.ExecuteQuery(database, query, null))
        {
          Console.WriteLine("\nLast ingested row:");
          PrintResultsAsValueList(response);
        }
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      while (response.Read()) {
        for (int i = 0; i < response.FieldCount; i++) {
          if (response.GetDataTypeName(i) == "Int64")
            Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetInt64(i));
          else
            Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetString(i));
        }
      }
    }
  }
}

Spuštění aplikace

V příkazovém prostředí spusťte aplikaci pomocí následujícího příkazu:

# Change directory to the folder that contains the management commands project
dotnet run .

Měl by se zobrazit výsledek podobný následujícímu:

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 0

Ingesting data from file: 
        C:\MyApp\stormevents.csv

Waiting 30 seconds for ingestion to complete

Number of rows in MyStormEvents AFTER ingesting the file:
         Count - 1000

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Fronta dat v paměti pro příjem dat a dotazování výsledků

Data z paměti můžete ingestovat vytvořením datového proudu obsahujícího data a následným řazením do fronty pro příjem dat.

Například můžete upravit aplikaci tak, že nahradíte importování z kódu souboru takto:

  1. Přidejte balíček popisovače streamu do importů v horní části souboru.

    Nejsou vyžadovány žádné další balíčky.

  2. Přidejte do paměti řetězec s daty pro zpracování.

    string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
    var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
    
  3. Nastavte vlastnosti příjmu dat tak, aby neignorovaly první záznam, protože řetězec v paměti neobsahuje řádek záhlaví.

    ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
    
  4. Ingestování dat v paměti jejich přidáním do dávkové fronty Pokud je to možné, zadejte velikost nezpracovaných dat.

    _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;
    

Přehled aktualizovaného kódu by měl vypadat takto:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      ...
      string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
      var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));

      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";

        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
        _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;

        ...
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      ...
    }
  }
}

Při spuštění aplikace by se měl zobrazit výsledek podobný následujícímu. Všimněte si, že po příjmu dat se počet řádků v tabulce zvýšil o jeden.

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1000

Ingesting data from memory:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from memory:
         Count - 1001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Vytvoření fronty objektu blob pro příjem dat a dotazování výsledků

Můžete ingestovat data z objektů blob služby Azure Storage, souborů Azure Data Lake a souborů Amazon S3.

Aplikaci můžete například upravit tak, že ingestujete z paměti kód následujícím kódem:

  1. Začněte tím, že nahrajete soubor stormevent.csv do účtu úložiště a vygenerujete identifikátor URI s oprávněními ke čtení, například pomocí tokenu SAS pro objekty blob Azure.

  2. Přidejte balíček popisovače objektů blob do importů v horní části souboru.

    Nejsou vyžadovány žádné další balíčky.

  3. Pomocí identifikátoru URI objektu blob vytvořte popisovač objektů blob, nastavte vlastnosti příjmu dat a pak ingestujte data z objektu blob. Nahraďte zástupný symbol <your_blob_uri> identifikátorem URI blobu.

    string blobUri = "<your_blob_uri>";
    
    ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
    _= ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;
    

Přehled aktualizovaného kódu by měl vypadat takto:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      ...
      string blobUri = "<your_blob_uri>";


      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";

        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
        _=_ ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;

        ...
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      ...
    }
  }
}

Při spuštění aplikace by se měl zobrazit výsledek podobný následujícímu. Všimněte si, že po příjmu dat se počet řádků v tabulce zvýšil o 1 000.

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1001

Ingesting data from a blob:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from a blob:
         Count - 2001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Další krok