Udostępnij za pośrednictwem


Utwórz aplikację do pobierania danych przy użyciu kolejkowanego pobierania

Dotyczy: ✅Microsoft FabricAzure Data Explorer

Usługa Kusto jest w stanie obsługiwać masowe pobieranie danych przez optymalizację i przetwarzanie wsadowe pozyskanych danych za pośrednictwem menedżera przetwarzania wsadowego. Menedżer wsadowy agreguje pozyskane dane przed dotarciem do tabeli docelowej, co pozwala na bardziej efektywne przetwarzanie i zwiększoną efektywność. Przetwarzanie wsadowe jest zazwyczaj realizowane w partiach po 1 GB danych pierwotnych, 1000 pojedynczych plików lub z domyślnym limitem czasu wynoszącym 5 minut. Zasady dzielenia na partie można aktualizować na poziomach bazy danych i tabeli, co zwykle zmniejsza czas dzielenia na partie i zmniejsza opóźnienie. Aby uzyskać więcej informacji na temat grupowania pozyskiwania, zobacz zasady grupowania pozyskiwania i programową zmianę zasad pozyskiwania danych na poziomie tabeli .

Notatka

Przetwarzanie wsadowe uwzględnia również różne czynniki, takie jak docelowa baza danych i tabela, użytkownik uruchamiający proces ładowania danych oraz różne właściwości związane z tym procesem, takie jak specjalne tagi.

Z tego artykułu dowiesz się, jak wykonywać następujące działania:

Warunki wstępne

Przed rozpoczęciem

  • Użyj jednej z następujących metod, aby utworzyć tabelę MyStormEvents i, ponieważ przesyłana jest tylko niewielka ilość danych, ustaw limit czasu zasad przesyłania danych na 10 sekund.

    1. Utwórz tabelę docelową o nazwie MyStormEvents w bazie danych, uruchamiając pierwszą aplikację w poleceniach zarządzania .
    2. Ustaw limit czasu zasady dzielenia na partie przy pozyskiwaniu na 10 sekund, uruchamiając drugą aplikację w poleceniach zarządzających . Przed uruchomieniem aplikacji zmień wartość limitu czasu na 00:00:10.

    Notatka

    Propagowanie nowych ustawień zasad grupowania do menedżera wsadowego może potrwać kilka minut.

  • Pobierz przykładowy plik danych stormevent.csv. Plik zawiera 1000 rekordów zdarzeń burzowych.

Notatka

W poniższych przykładach przyjęto założenie, że trywialne dopasowanie występuje między kolumnami pozyskanych danych a schematem tabeli docelowej. Jeśli pozyskane dane nie pasują w oczywisty sposób do schematu tabeli, należy użyć mapowania wczytywania danych, aby wyrównać kolumny danych ze schematem tabeli.

Kolejkowanie pliku do wczytywania i zapytań o wyniki

W preferowanym środowisku IDE lub edytorze tekstów utwórz projekt lub plik o nazwie podstawowego ingestowania zgodnie z konwencjami preferowanego języka. Umieść plik stormevent.csv w tej samej lokalizacji co aplikacja.

Notatka

W poniższych przykładach używasz dwóch klientów, jeden do wykonywania zapytań względem klastra, a drugi do pozyskiwania danych do klastra. W przypadku języków, w których biblioteka klienta go obsługuje, oba klienci współdzielą ten sam monit o uwierzytelnienia użytkownika, co powoduje wyświetlenie jednego monitu użytkownika zamiast jednego dla każdego klienta.

Dodaj następujący kod:

  1. Utwórz aplikację kliencką, która łączy się z klastrem i wyświetla liczbę wierszy w tabeli MyStormEvents. Użyjesz tej liczby jako punktu odniesienia do porównania z liczbą wierszy po każdej metodzie przetwarzania. Zastąp symbole zastępcze <your_cluster_uri> i <your_database> odpowiednio swoim identyfikatorem URI klastra i nazwą bazy danych.

    using Kusto.Data;
    using Kusto.Data.Net.Client;
    
    namespace BatchIngest {
      class BatchIngest {
        static void Main(string[] args) {
          string clusterUri = "<your_cluster_uri>";
          var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
            .WithAadUserPromptAuthentication();
    
          using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) {
            string database = "<your_database>";
            string table = "MyStormEvents";
    
            string query = table + " | count";
            using (var response = kustoClient.ExecuteQuery(database, query, null)) {
              Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
              PrintResultsAsValueList(response);
            }
          }
        }
    
        static void PrintResultsAsValueList(IDataReader response) {
          string value;
          while (response.Read()) {
            for (int i = 0; i < response.FieldCount; i++) {
              value = "";
              if (response.GetDataTypeName(i) == "Int32")
                  value = response.GetInt32(i).ToString();
              else if (response.GetDataTypeName(i) == "Int64")
                value = response.GetInt64(i).ToString();
              else if (response.GetDataTypeName(i) == "DateTime")
                value = response.GetDateTime(i).ToString();
              else if (response.GetDataTypeName(i) == "Object")
                value = response.GetValue(i).ToString() ?? "{}";
              else
                value = response.GetString(i);
    
              Console.WriteLine("\t{0} - {1}", response.GetName(i), value ?? "None");
          }
        }
      }
    }
    
  2. Utwórz obiekt konstruktora parametrów połączenia, który definiuje identyfikator URI pozyskiwania danych, jeśli to możliwe, przy użyciu współużytkowania tych samych poświadczeń uwierzytelniania co identyfikator URI klastra. Zastąp symbol zastępczy <your_ingestion_uri> identyfikatorem URI pozyskiwania danych.

    using Kusto.Data.Common;
    using Kusto.Ingest;
    using System.Data;
    
    string ingestUri = "<your_ingestion_uri>";
    var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
      .WithAadUserPromptAuthentication();
    
  3. Pobierz plik stormevent.csv, dodając go do kolejki wsadowej. Użyj następujących obiektów i właściwości:

    • QueuedIngestClient do tworzenia klienta pozyskiwania danych.
    • IngestionProperties, aby ustawić właściwości ingestii.
    • DataFormat, aby określić format pliku csv.
    • ignore_first_record określić, czy pierwszy wiersz w plikach CSV i podobnych typach plików jest ignorowany, używając następującej logiki:
      • true: pierwszy wiersz jest ignorowany. Użyj tej opcji, aby usunąć wiersz nagłówka z danych tekstowych tabelarycznych.
      • false: pierwszy wiersz jest traktowany jako zwykły wiersz.

    Notatka

    Importowanie obsługuje maksymalny rozmiar pliku 6 GB. Zaleca się wczytywanie plików o wielkości od 100 MB do 1 GB.

    using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
      string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
    
      Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
      var ingestProps = new KustoIngestionProperties(database, table) {
        Format = DataSourceFormat.csv,
        AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
      };
      _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;
    }
    
  4. Wykonaj zapytanie dotyczące liczby wierszy w tabeli po pozyskaniu pliku i wyświetl ostatni wiersz pozyskany.

    Notatka

    Aby umożliwić ukończenie przetwarzania danych, poczekaj 30 sekund przed wykonaniem zapytania do tabeli. W przypadku języka C# poczekaj 60 sekund, aby umożliwić asynchronicznie dodanie pliku do kolejki pozyskiwania.

    Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
    Thread.Sleep(TimeSpan.FromSeconds(60));
    
    using (var response = kustoClient.ExecuteQuery(database, query, null)) {
      Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
      PrintResultsAsValueList(response);
    }
    
    query = table + " | top 1 by ingestion_time()";
    using (var response = kustoClient.ExecuteQuery(database, query, null)) {
      Console.WriteLine("\nLast ingested row:");
      PrintResultsAsValueList(response);
    }
    

Pełny kod powinien wyglądać następująco:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      string clusterUri = "<your_cluster_uri>";
      var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
        .WithAadUserPromptAuthentication();
      string ingestUri = "<your_ingestion_uri>";
      var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
        .WithAadUserPromptAuthentication();


      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";
        string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");

        string query = table + " | count";
        using (var response = kustoClient.ExecuteQuery(database, query, null)) {
          Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
          PrintResultsAsValueList(response);
        }

        Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
        var ingestProps = new KustoIngestionProperties(database, table) {
          Format = DataSourceFormat.csv,
          AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
        };
        _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;

        Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
        Thread.Sleep(TimeSpan.FromSeconds(60));

        using (var response = kustoClient.ExecuteQuery(database, query, null)) {
          Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
          PrintResultsAsValueList(response);
        }

        query = table + " | top 1 by ingestion_time()";
        using (var response = kustoClient.ExecuteQuery(database, query, null))
        {
          Console.WriteLine("\nLast ingested row:");
          PrintResultsAsValueList(response);
        }
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      while (response.Read()) {
        for (int i = 0; i < response.FieldCount; i++) {
          if (response.GetDataTypeName(i) == "Int64")
            Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetInt64(i));
          else
            Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetString(i));
        }
      }
    }
  }
}

Uruchamianie aplikacji

W konsoli użyj następującego polecenia, aby uruchomić aplikację:

# Change directory to the folder that contains the management commands project
dotnet run .

Powinien zostać wyświetlony wynik podobny do następującego:

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 0

Ingesting data from file: 
        C:\MyApp\stormevents.csv

Waiting 30 seconds for ingestion to complete

Number of rows in MyStormEvents AFTER ingesting the file:
         Count - 1000

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Umieść dane w pamięci w kolejkę na potrzeby przetwarzania i zapytaj o wyniki

Dane można pozyskiwać z pamięci, tworząc strumień zawierający dane, a następnie kolejkując je do pozyskiwania.

Na przykład można zmodyfikować aplikację, zastępując pobieranie danych z pliku kodem w następujący sposób:

  1. Dodaj pakiet deskryptora strumienia do importów w górnej części pliku.

    Nie są wymagane żadne dodatkowe pakiety.

  2. Dodaj ciąg w pamięci z danymi do pozyskiwania.

    string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
    var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
    
  3. Ustaw właściwości pozyskiwania, aby nie ignorować pierwszego rekordu, ponieważ ciąg w pamięci nie ma wiersza nagłówka.

    ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
    
  4. Załaduj dane z pamięci, dodając je do kolejki wsadowej. Jeśli to możliwe, podaj rozmiar danych pierwotnych.

    _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;
    

Konspekt zaktualizowanego kodu powinien wyglądać następująco:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      ...
      string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
      var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));

      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";

        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
        _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;

        ...
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      ...
    }
  }
}

Po uruchomieniu aplikacji powinien zostać wyświetlony wynik podobny do poniższego. Zwróć uwagę, że po załadowaniu liczba wierszy w tabeli wzrosła o jeden.

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1000

Ingesting data from memory:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from memory:
         Count - 1001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Ustaw obiekt blob w kolejce do pozyskiwania i zapytaj o wyniki

Dane można pozyskiwać z obiektów blob usługi Azure Storage, plików usługi Azure Data Lake i plików Amazon S3.

Na przykład można zmodyfikować aplikację, zastępując kod przyjmowania danych z pamięci następującym kodem:

  1. Zacznij od przekazania pliku stormevent.csv na konto przechowywania i tworząc identyfikator URI z uprawnieniami do odczytu, na przykład przy użyciu tokenu SAS dla obiektów blob platformy Azure.

  2. Dodaj pakiet deskryptora obiektów blob do importów w górnej części pliku.

    Nie są wymagane żadne dodatkowe pakiety.

  3. Utwórz deskryptor bloba przy użyciu identyfikatora URI bloba, ustaw właściwości ładowania, a następnie załaduj dane z bloba. Zastąp symbol zastępczy <your_blob_uri> identyfikatorem URI obiektu blob.

    string blobUri = "<your_blob_uri>";
    
    ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
    _= ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;
    

Konspekt zaktualizowanego kodu powinien wyglądać następująco:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      ...
      string blobUri = "<your_blob_uri>";


      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";

        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
        _=_ ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;

        ...
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      ...
    }
  }
}

Po uruchomieniu aplikacji powinien zostać wyświetlony wynik podobny do poniższego. Zwróć uwagę, że po załadowaniu danych liczba wierszy w tabeli wzrosła o 1 000.

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1001

Ingesting data from a blob:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from a blob:
         Count - 2001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Następny krok

metody uwierzytelniania aplikacji