Udostępnij za pośrednictwem


Przewodnik migracji: Elasticsearch do usługi Azure Data Explorer

Z tego przewodnika dowiesz się , jak migrować dane elasticsearch do usługi Azure Data Explorer przy użyciu usługi Logstash.

W tym przewodniku dane, które mają zostać zmigrowane, znajduje się w indeksie Elasticsearch o nazwie vehicle , który ma następujący schemat danych:

{
  "Manufacturer": "string",
  "Model": "string",
  "ReleaseYear": "int",
  "ReleaseDate": "datetime"
}

Wymagania wstępne

Aby przeprowadzić migrację danych elasticsearch do usługi Azure Data Explorer, potrzebne są następujące elementy:

  • Konto Microsoft lub tożsamość użytkownika Microsoft Entra. Subskrypcja platformy Azure nie jest wymagana.
  • Baza danych i klaster usługi Azure Data Explorer. Możesz utworzyć bezpłatny klaster lub utworzyć pełny klaster. Aby zdecydować, co jest najlepsze dla Ciebie, sprawdź porównanie funkcji.
  • Identyfikator aplikacji i delegowane uprawnienia dostępu do klastra usługi Azure Data Explorer. Aby uzyskać więcej informacji, zobacz Tworzenie aplikacji Microsoft Entra. Aby skonfigurować potok usługi Logstash, potrzebny jest identyfikator aplikacji, wpis tajny i identyfikator dzierżawy.
  • Logstash w wersji 6+ Instrukcje instalacji.

Przed migracją

Po spełnieniu wymagań wstępnych możesz zapoznać się z topologią środowiska i ocenić możliwość migracji do chmury platformy Azure.

Tworzenie schematu docelowego w klastrze usługi Azure Data Explorer

Aby prawidłowo pozyskiwać i strukturę danych na potrzeby wykonywania zapytań i analizy, należy utworzyć schemat tabeli i mapowanie w klastrze usługi Azure Data Explorer.

Schemat tabeli i migrowanych danych powinny być zgodne. Mapowanie pozyskiwania jest ważne, aby ustanowić mapowanie kolumn źródłowych w ELK na kolumny docelowe w tabeli.

Aby utworzyć schemat tabeli i mapowanie pozyskiwania w klastrze:

  1. Zaloguj się do internetowego interfejsu użytkownika usługi Azure Data Explorer.

  2. Dodaj połączenie z klastrem.

  3. Wybierz bazę danych, w której chcesz utworzyć schemat tabeli dla danych migracji.

  4. Uruchom następujące polecenie w oknie zapytania bazy danych, aby utworzyć schemat tabeli.

    .create tables Vehicle (
      Manufacturer: string,
      Model: string,
      ReleaseYear: int,
      ReleaseDate: datetime
      )
    
  5. Uruchom następujące polecenie, aby utworzyć mapowanie pozyskiwania.

    .create table Vehicle ingestion json mapping 'VechicleMapping'
      '['
      '  {"column":"Manufacturer", "path":"$.manufacturer"},'
      '  {"column":"Model", "path":"$.model"},'
      '  {"column":"ReleaseYear", "path":"$.releaseYear"},'
      '  {"column":"ReleaseDate", "path":"$.releaseDate"}'
      ']'
    

Przygotowywanie usługi Logstash do migracji

Podczas migrowania danych do klastra usługi Azure Data Explorer ważne jest prawidłowe skonfigurowanie potoku usługi Logstash. Potok zapewnia, że dane są poprawnie sformatowane i przesyłane do tabeli docelowej.

Jeśli musisz przenieść dane z wielu klastrów lub indeksów elasticsearch, możesz utworzyć wiele sekcji wejściowych w pliku konfiguracji potoku. Aby to osiągnąć, można zdefiniować jedną sekcję danych wejściowych dla każdego klastra lub indeksu elasticsearch i kategoryzować je przy użyciu tagów, jeśli chcesz. Następnie możesz użyć tych tagów w instrukcjach warunkowych w sekcji danych wyjściowych, aby skierować te zestawy danych do określonych tabel klastra usługi Azure Data Explorer

Aby skonfigurować potok usługi Logstash:

  1. W powłoce poleceń przejdź do katalogu głównego logstash, a następnie uruchom następujące polecenie, aby zainstalować wtyczkę danych wyjściowych usługi Logstash. Aby uzyskać więcej informacji na temat wtyczki, zobacz Pozyskiwanie danych z usługi Logstash.

    bin/logstash-plugin install logstash-output-kusto
    
  2. Utwórz plik konfiguracji potoku usługi Logstash przy użyciu następujących ustawień:

    input {
      elasticsearch {
        hosts => "http://localhost:9200"
        index => "vehicle"
        query => '{ "query": { "range" : { "releaseDate": { "gte": "2019-01-01", "lte": "2023-12-31" }}}}'
        user => "<elasticsearch_username>"
        password => "<elasticsearch_password>"
        ssl => true
        ca_file => "<certification_file>"
      }
    }
    
    filter
    {
      ruby
      {
        code => "event.set('[@metadata][timebucket]', Time.now().to_i/10)"
      }
    }
    
    output {
      kusto {
        path => "/tmp/region1/%{+YYYY-MM-dd}-%{[@metadata][timebucket]}.txt"
        ingest_url => "https://ingest-<azure_data_explorer_cluster_name>.<region>.kusto.windows.net"
        app_id => "<app_id>"
        app_key => "<app_secret>"
        app_tenant => "<app_tenant_id>"
        database => "<your_database>"
        table => "Vehicle" // The table schema you created earlier
        json_mapping => "vehicleMapping" // The ingestion mapping you created earlier
      }
    }
    

    Parametry wejściowe

    Nazwa parametru opis
    Hostów Adres URL klastra Elasticsearch.
    indeks Nazwa indeksu do migracji.
    query Opcjonalne zapytanie w celu pobrania określonych danych z indeksu.
    użytkownik Nazwa użytkownika do nawiązania połączenia z klastrem Elasticsearch.
    hasło Hasło do nawiązania połączenia z klastrem Elasticsearch.
    tagów Opcjonalne tagi identyfikujące źródło danych. Na przykład określ tags => ["vehicle"] w sekcji elasticsearch, a następnie filtruj przy użyciu if "vehicle" in [tags] { ... } opakowującego sekcję kusto.
    ssl Określa, czy wymagany jest certyfikat SSL.
    ca_file Plik certyfikatu do przekazania na potrzeby uwierzytelniania.

    Parametry filtru

    Filtr ruby uniemożliwia pozyskiwanie zduplikowanych danych do klastra przez ustawienie unikatowego znacznika czasu dla plików danych elasticsearch co 10 sekund. Jest to najlepsze rozwiązanie, które dzieli dane na pliki z unikatowym znacznikiem czasu, zapewniając, że dane są prawidłowo przetwarzane na potrzeby migracji.

    Parametry wyjściowe

    Nazwa parametru opis
    path Wtyczka Logstash zapisuje zdarzenia w plikach tymczasowych przed wysłaniem ich do klastra. Ten parametr opisuje ścieżkę do miejsca zapisania plików tymczasowych oraz wyrażenie czasu rotacji plików w celu wyzwolenia przekazywania do klastra.
    ingest_url Punkt końcowy klastra do komunikacji związanej z pozyskiwaniem.
    app_id, app_key i app_tenant Poświadczenia wymagane do nawiązania połączenia z klastrem. Upewnij się, że używasz aplikacji z uprawnieniami pozyskiwania. Aby uzyskać więcej informacji, zobacz Wymagania wstępne.
    database Nazwa bazy danych, w której zostaną umieszczone zdarzenia.
    table Nazwa docelowej tabeli, w której zostaną umieszczone zdarzenia.
    json_mapping Mapowanie służy do mapowania przychodzącego ciągu json zdarzenia na poprawny format wiersza (definiuje właściwość ELK przechodzi do kolumny schematu tabeli).

Migracja

Po zakończeniu przygotowywania kroków przed migracją następnym krokiem jest wykonanie procesu migracji. Ważne jest, aby monitorować potok podczas procesu migracji danych, aby upewnić się, że działa płynnie i aby można było rozwiązać wszelkie problemy, które mogą wystąpić.

Aby przeprowadzić migrację danych, w powłoce poleceń przejdź do katalogu głównego usługi Logstash, a następnie uruchom następujące polecenie:

bin/logstash -f <your_pipeline>.conf

Powinny zostać wyświetlone informacje wydrukowane na ekranie.

Po migracji

Po zakończeniu migracji należy przejść przez szereg zadań po migracji, aby zweryfikować dane i upewnić się, że wszystko działa tak sprawnie i wydajnie, jak to możliwe.

Proces weryfikacji danych dla określonego indeksu zwykle składa się z następujących działań:

Porównanie danych: porównaj zmigrowane dane w klastrze usługi Azure Data Explorer do oryginalnych danych w usłudze Elasticsearch. Można to zrobić przy użyciu narzędzia takiego jak Kibana w stosie ELK, które umożliwia wykonywanie zapytań i wizualizowanie danych w obu środowiskach.

Wykonywanie zapytań: uruchom serię zapytań względem zmigrowanych danych w klastrze usługi Azure Data Explorer, aby upewnić się, że dane są dokładne i kompletne. Obejmuje to uruchamianie zapytań testujących relacje między różnymi polami i zapytań testujących integralność danych.

Sprawdź brakujące dane: porównaj zmigrowane dane w klastrze z danymi w usłudze Elasticsearch, aby sprawdzić brakujące dane, zduplikowane dane lub inne niespójności danych.

Zweryfikuj wydajność: przetestuj wydajność zmigrowanych danych w klastrze i porównaj je z wydajnością danych w usłudze Elasticsearch. Może to obejmować uruchamianie zapytań i wizualizowanie danych w celu przetestowania czasów odpowiedzi i upewnienia się, że dane w klastrze są zoptymalizowane pod kątem wydajności.

Ważne

Powtórz proces weryfikacji danych, jeśli zostaną wprowadzone jakiekolwiek zmiany w zmigrowanych danych lub klastrze, aby upewnić się, że dane są nadal dokładne i kompletne.

Poniżej przedstawiono kilka przykładów zapytań, które można uruchomić, aby zweryfikować dane w klastrze:

  1. W usłudze Elasticsearch uruchom następujące zapytania, aby pobrać element :

    // Gets the total record count of the index
    GET vehicle/_count
    
    // Gets the total record count of the index based on a datetime query
    GET vehicle/_count
    {
      "query": {
        "range" : {
          "releaseDate": { "gte": "2021-01-01", "lte": "2021-12-31" }
                  }
              }
    }
    
    // Gets the count of all vehicles that has manufacturer as "Honda".
    GET vehicle/_count
    {
      "query": {
        "bool" : {
          "must" : {
            "term" : { "manufacturer" : "Honda" }
          }
        }
      }
    }
    
    // Get the record count where a specific property doesn't exist.
    // This is helpful especially when some records don't have NULL properties.
    GET vehicle/_count
    {
      "query": {
        "bool": {
          "must_not": {
            "exists": {
              "field": "description"
            }
          }
        }
      }
    }
    
  2. W oknie zapytania bazy danych uruchom następujące odpowiednie zapytanie:

    // Gets the total record count in the table
    Vehicle
    | count
    
    // Gets the total record count where a given property is NOT empty/null
    Vehicle
    | where isnotempty(Manufacturer)
    
    // Gets the total record count where a given property is empty/null
    Vehicle
    | where isempty(Manufacturer)
    
    // Gets the total record count by a property value
    Vehicle
    | where Manufacturer == "Honda"
    | count
    
  3. Porównaj wyniki z obu zestawów zapytań, aby upewnić się, że dane w klastrze są dokładne i kompletne.

Aby dowiedzieć się więcej o Eksploratorze usługi Azure Database, zobacz:

Aby dowiedzieć się więcej o cyklu struktury i wdrażania migracji do chmury, zobacz: