Udostępnij za pośrednictwem


Pozyskiwanie danych z platformy Apache Kafka do usługi Azure Data Explorer

Apache Kafka to rozproszona platforma przesyłania strumieniowego do tworzenia potoków danych przesyłanych strumieniowo w czasie rzeczywistym, które niezawodnie przenoszą dane między systemami lub aplikacjami. Kafka Connect to narzędzie do skalowalnego i niezawodnego przesyłania strumieniowego danych między platformą Apache Kafka i innymi systemami danych. Ujście platformy Kafka kusto służy jako łącznik z platformy Kafka i nie wymaga użycia kodu. Pobierz plik jar łącznika ujścia z repozytorium Git lub centrum łącznika Confluent.

W tym artykule pokazano, jak pozyskiwać dane za pomocą platformy Kafka, korzystając z samodzielnej konfiguracji platformy Docker w celu uproszczenia konfiguracji klastra platformy Kafka i klastra łącznika platformy Kafka.

Aby uzyskać więcej informacji, zobacz repozytorium Git łącznika i specyfikę wersji.

Wymagania wstępne

Tworzenie jednostki usługi Entra firmy Microsoft

Jednostkę usługi Microsoft Entra można utworzyć za pomocą witryny Azure Portal lub programowo, jak w poniższym przykładzie.

Ta jednostka usługi jest tożsamością używaną przez łącznik do zapisywania danych w tabeli w usłudze Kusto. Przyznasz uprawnienia dla tej jednostki usługi w celu uzyskania dostępu do zasobów usługi Kusto.

  1. Zaloguj się do subskrypcji platformy Azure za pomocą interfejsu wiersza polecenia platformy Azure. Następnie uwierzytelnij się w przeglądarce.

    az login
    
  2. Wybierz subskrypcję do hostowania podmiotu zabezpieczeń. Ten krok jest wymagany, gdy masz wiele subskrypcji.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Utwórz jednostkę usługi. W tym przykładzie jednostka usługi nosi nazwę my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Z zwróconych danych JSON skopiuj wartości appId, passwordi tenant do użycia w przyszłości.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

Utworzono aplikację Microsoft Entra i jednostkę usługi.

Tworzenie tabeli docelowej

  1. W środowisku zapytań utwórz tabelę o nazwie Storms przy użyciu następującego polecenia:

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    
  2. Utwórz odpowiednie mapowanie Storms_CSV_Mapping tabeli dla pozyskanych danych przy użyciu następującego polecenia:

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  3. Utwórz zasady dzielenia na partie pozyskiwania danych w tabeli w celu skonfigurowania opóźnienia pozyskiwania w kolejce.

    Napiwek

    Zasady dzielenia na partie pozyskiwania są optymalizatorem wydajności i zawierają trzy parametry. Pierwszy warunek spełnia warunek wyzwala pozyskiwanie do tabeli.

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  4. Użyj jednostki usługi w sekcji Tworzenie jednostki usługi Entra firmy Microsoft, aby udzielić uprawnień do pracy z bazą danych.

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

Uruchamianie laboratorium

Poniższe laboratorium ma na celu zapewnienie doświadczenia w tworzeniu danych, konfigurowaniu łącznika platformy Kafka i przesyłaniu strumieniowego tych danych. Następnie możesz przyjrzeć się pozyskanym danym.

Klonowanie repozytorium git

Sklonuj repozytorium git laboratorium.

  1. Utwórz katalog lokalny na maszynie.

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. Sklonuj repozytorium.

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

Zawartość sklonowanego repozytorium

Uruchom następujące polecenie, aby wyświetlić listę zawartości sklonowanego repozytorium:

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

Ten wynik tego wyszukiwania to:

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

Przejrzyj pliki w sklonowanym repozytorium

W poniższych sekcjach opisano ważne części plików w drzewie plików.

adx-sink-config.json

Ten plik zawiera plik właściwości ujścia kusto, w którym są aktualizowane szczegółowe informacje o konfiguracji:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

Zastąp wartości następujących atrybutów zgodnie z konfiguracją: aad.auth.authority, aad.auth.appid, , aad.auth.appkey( kusto.tables.topics.mapping nazwa bazy danych), i .kusto.query.urlkusto.ingestion.url

Łącznik — Dockerfile

Ten plik zawiera polecenia służące do generowania obrazu platformy Docker dla wystąpienia łącznika. Obejmuje on pobieranie łącznika z katalogu wersji repozytorium Git.

Katalog storm-events-producer

Ten katalog zawiera program języka Go, który odczytuje lokalny plik "StormEvents.csv" i publikuje dane w temacie platformy Kafka.

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

Uruchamianie kontenerów

  1. W terminalu uruchom kontenery:

    docker-compose up
    

    Aplikacja producenta rozpoczyna wysyłanie zdarzeń do tematu storm-events . Powinny zostać wyświetlone dzienniki podobne do następujących dzienników:

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  |
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. Aby sprawdzić dzienniki, uruchom następujące polecenie w osobnym terminalu:

    docker-compose logs -f | grep kusto-connect
    

Uruchamianie łącznika

Użyj wywołania REST platformy Kafka Connect, aby uruchomić łącznik.

  1. W osobnym terminalu uruchom zadanie ujścia za pomocą następującego polecenia:

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. Aby sprawdzić stan, uruchom następujące polecenie w osobnym terminalu:

    curl http://localhost:8083/connectors/storm/status
    

Łącznik rozpoczyna kolejkowanie procesów pozyskiwania.

Uwaga

Jeśli masz problemy z łącznikiem dzienników, utwórz problem.

Tożsamość zarządzana

Domyślnie łącznik platformy Kafka używa metody aplikacji do uwierzytelniania podczas pozyskiwania. Aby uwierzytelnić się przy użyciu tożsamości zarządzanej:

  1. Przypisz klastrowi tożsamość zarządzaną i przyznaj swojemu kontu magazynu uprawnienia do odczytu. Aby uzyskać więcej informacji, zobacz Pozyskiwanie danych przy użyciu uwierzytelniania tożsamości zarządzanej.

  2. W pliku adx-sink-config.json ustaw aad.auth.strategy managed_identity wartość i upewnij się, że aad.auth.appid ustawiono identyfikator klienta tożsamości zarządzanej (aplikacji).

  3. Użyj tokenu usługi metadanych wystąpienia prywatnego zamiast jednostki usługi Microsoft Entra.

Uwaga

W przypadku korzystania z tożsamości appId zarządzanej i tenant są one wywoływane z kontekstu lokacji wywołań i password nie są potrzebne.

Wykonywanie zapytań i przeglądanie danych

Potwierdzanie pozyskiwania danych

  1. Po dodaniu danych do Storms tabeli potwierdź transfer danych, sprawdzając liczbę wierszy:

    Storms 
    | count
    
  2. Upewnij się, że w procesie pozyskiwania nie występują żadne błędy:

    .show ingestion failures
    

    Gdy zobaczysz dane, wypróbuj kilka zapytań.

Wykonywanie zapytań na danych

  1. Aby wyświetlić wszystkie rekordy, uruchom następujące zapytanie:

    Storms
    | take 10
    
  2. Użyj funkcji where i project , aby filtrować określone dane:

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. Użyj operatora summarize:

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    Zrzut ekranu przedstawiający połączone wyniki wykresu kolumnowego zapytania platformy Kafka.

Aby uzyskać więcej przykładów zapytań i wskazówek, zobacz Pisanie zapytań w języku KQL i język zapytań Kusto dokumentacji.

Reset

Aby zresetować, wykonaj następujące czynności:

  1. Zatrzymywanie kontenerów (docker-compose down -v)
  2. Usuń (drop table Storms)
  3. Utwórz ponownie tabelę Storms
  4. Ponowne tworzenie mapowania tabeli
  5. Ponowne uruchamianie kontenerów (docker-compose up)

Czyszczenie zasobów

Aby usunąć zasoby usługi Azure Data Explorer, użyj polecenia az kusto cluster delete (rozszerzenie kusto) lub az kusto database delete (rozszerzenie kusto)::

az kusto cluster delete --name "<cluster name>" --resource-group "<resource group name>"
az kusto database delete --cluster-name "<cluster name>" --database-name "<database name>" --resource-group "<resource group name>"

Klaster i baza danych można również usunąć za pośrednictwem witryny Azure Portal. Aby uzyskać więcej informacji, zobacz Usuwanie klastra usługi Azure Data Explorer i Usuwanie bazy danych w usłudze Azure Data Explorer.

Dostrajanie łącznika ujścia platformy Kafka

Dostosuj łącznik ujścia platformy Kafka, aby pracować z zasadami dzielenia na partie pozyskiwania:

  • Dostosuj limit rozmiaru ujścia flush.size.bytes platformy Kafka, począwszy od 1 MB, zwiększając się o zwiększenie o 10 MB lub 100 MB.
  • W przypadku korzystania z ujścia platformy Kafka dane są agregowane dwa razy. Dane po stronie łącznika są agregowane zgodnie z ustawieniami opróżniania, a po stronie usługi zgodnie z zasadami dzielenia na partie. Jeśli czas dzielenia na partie jest zbyt krótki, więc nie można pozyskiwać danych zarówno przez łącznik, jak i usługę, należy zwiększyć czas dzielenia na partie. Ustaw rozmiar partii na 1 GB i zwiększ lub zmniejsz o 100 MB przyrostów zgodnie z potrzebami. Jeśli na przykład rozmiar opróżniania wynosi 1 MB, a rozmiar zasad dzielenia na partie wynosi 100 MB, łącznik ujścia platformy Kafka agreguje dane do partii 100 MB. Ta partia jest następnie pozyskiwana przez usługę. Jeśli czas zasad przetwarzania wsadowego wynosi 20 sekund, a łącznik ujścia platformy Kafka opróżnia 50 MB w ciągu 20 sekund, usługa pozyskuje partię 50 MB.
  • Skalowanie można przeprowadzić, dodając wystąpienia i partycje platformy Kafka. Zwiększ tasks.max liczbę partycji. Utwórz partycję, jeśli masz wystarczającą ilość danych, aby utworzyć obiekt blob o rozmiarze flush.size.bytes ustawienia. Jeśli obiekt blob jest mniejszy, partia jest przetwarzana po osiągnięciu limitu czasu, więc partycja nie otrzymuje wystarczającej przepływności. Duża liczba partycji oznacza większe obciążenie związane z przetwarzaniem.