Sdílet prostřednictvím


Ingestování dat z Apache Kafka do Azure Data Exploreru

Apache Kafka je distribuovaná platforma streamování pro vytváření datových kanálů streamování v reálném čase, které spolehlivě přesouvají data mezi systémy nebo aplikacemi. Kafka Connect je nástroj pro škálovatelné a spolehlivé streamování dat mezi Apache Kafka a dalšími datovými systémy. Jímka Kusto Kafka slouží jako konektor ze systému Kafka a nevyžaduje použití kódu. Stáhněte soubor JAR konektoru jímky z úložiště Git nebo centra konektorů Confluent.

Tento článek ukazuje, jak ingestovat data pomocí systému Kafka pomocí samostatného nastavení Dockeru pro zjednodušení nastavení clusteru Kafka a clusteru konektoru Kafka.

Další informace najdete v úložišti Git a konkrétních verzích konektoru.

Požadavky

Vytvoření instančního objektu Microsoft Entra

Instanční objekt Microsoft Entra je možné vytvořit prostřednictvím webu Azure Portal nebo programově, jako v následujícím příkladu.

Tento instanční objekt je identita, kterou konektor používá k zápisu dat do tabulky v Kusto. Udělíte oprávnění pro tento instanční objekt pro přístup k prostředkům Kusto.

  1. Přihlaste se ke svému předplatnému Azure prostřednictvím Azure CLI. Pak se ověřte v prohlížeči.

    az login
    
  2. Zvolte předplatné, které má být hostitelem objektu zabezpečení. Tento krok je potřeba v případě, že máte více předplatných.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Vytvořte instanční objekt. V tomto příkladu se instanční objekt nazývá my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Z vrácených dat JSON zkopírujte appIdpasswordtenant a pro budoucí použití.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

Vytvořili jste aplikaci Microsoft Entra a instanční objekt.

Vytvoření cílové tabulky

  1. V prostředí dotazu vytvořte tabulku s názvem Storms pomocí následujícího příkazu:

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    
  2. Pomocí následujícího příkazu vytvořte odpovídající mapování Storms_CSV_Mapping tabulky pro ingestované data:

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  3. Vytvořte zásadu dávkování příjmu dat v tabulce pro konfigurovatelnou latenci příjmu dat ve frontě.

    Tip

    Zásady dávkování příjmu dat jsou optimalizátor výkonu a obsahují tři parametry. První podmínka, která splňuje, aktivuje příjem dat do tabulky.

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  4. Pomocí instančního objektu z vytvoření instančního objektu Microsoft Entra udělte oprávnění pro práci s databází.

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

Spuštění testovacího prostředí

Následující cvičení je navržené tak, aby vám poskytlo prostředí pro vytváření dat, nastavení konektoru Kafka a streamování těchto dat. Pak se můžete podívat na ingestované data.

Klonování úložiště Git

Naklonujte úložiště Git testovacího prostředí.

  1. Na počítači vytvořte místní adresář.

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. Naklonujte úložiště.

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

Obsah klonovaného úložiště

Spuštěním následujícího příkazu zobrazte seznam obsahu klonovaného úložiště:

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

Tento výsledek hledání:

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

Kontrola souborů v naklonovaném úložišti

Následující části popisují důležité části souborů ve stromu souborů.

adx-sink-config.json

Tento soubor obsahuje soubor vlastností jímky Kusto, kde aktualizujete konkrétní podrobnosti konfigurace:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

Nahraďte hodnoty následujících atributů podle nastavení: , , , kusto.tables.topics.mapping (název databáze) kusto.ingestion.urla kusto.query.url. aad.auth.appkeyaad.auth.appidaad.auth.authority

Konektor – Dockerfile

Tento soubor obsahuje příkazy pro vygenerování image Dockeru pro instanci konektoru. Zahrnuje stažení konektoru z adresáře verze úložiště Git.

Adresář Storm-events-producer

Tento adresář má program Go, který čte místní soubor "StormEvents.csv" a publikuje data do tématu Kafka.

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

Spuštění kontejnerů

  1. V terminálu spusťte kontejnery:

    docker-compose up
    

    Aplikace producenta začne odesílat události do storm-events tématu. Měly by se zobrazit protokoly podobné následujícím protokolům:

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  |
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. Pokud chcete zkontrolovat protokoly, spusťte v samostatném terminálu následující příkaz:

    docker-compose logs -f | grep kusto-connect
    

Spuštění konektoru

Ke spuštění konektoru použijte volání REST služby Kafka Connect.

  1. V samostatném terminálu spusťte úlohu jímky pomocí následujícího příkazu:

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. Pokud chcete zkontrolovat stav, spusťte v samostatném terminálu následující příkaz:

    curl http://localhost:8083/connectors/storm/status
    

Konektor začne zařadovat procesy příjmu dat do fronty.

Poznámka:

Pokud máte problémy s konektorem protokolu, vytvořte problém.

Spravovaná identita

Ve výchozím nastavení konektor Kafka používá metodu aplikace pro ověřování během příjmu dat. Ověření pomocí spravované identity:

  1. Přiřaďte cluster spravované identitě a udělte účtu úložiště oprávnění ke čtení. Další informace najdete v tématu Ingestování dat pomocí ověřování spravované identity.

  2. V souboru adx-sink-config.json nastavte managed_identity aad.auth.strategy a ujistěte se, že aad.auth.appid je nastavené NA ID klienta spravované identity (aplikace).

  3. Místo instančního objektu Microsoft Entra použijte token služby metadat privátní instance.

Poznámka:

Při použití spravované identity appId a tenant jsou vyvolány z kontextu webu volání a password nejsou potřeba.

Dotazování a kontrola dat

Potvrzení příjmu dat

  1. Jakmile data dorazí do Storms tabulky, potvrďte přenos dat tak, že zkontrolujete počet řádků:

    Storms 
    | count
    
  2. Ověřte, že v procesu příjmu dat nedošlo k žádným selháním:

    .show ingestion failures
    

    Jakmile uvidíte data, vyzkoušejte několik dotazů.

Vytváření dotazů na data

  1. Pokud chcete zobrazit všechny záznamy, spusťte následující dotaz:

    Storms
    | take 10
    
  2. Použití where a project filtrování konkrétních dat:

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. summarize Použijte operátor:

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    Snímek obrazovky s připojenými výsledky sloupcového grafu dotazu Kafka

Další příklady a pokyny k dotazům najdete v dokumentaci k psaní dotazů v KQL a dotazovací jazyk Kusto.

Reset

Pokud chcete resetovat, proveďte následující kroky:

  1. Zastavení kontejnerů (docker-compose down -v)
  2. Odstranit (drop table Storms)
  3. Opětovné vytvoření Storms tabulky
  4. Opětovné vytvoření mapování tabulek
  5. Restartování kontejnerů (docker-compose up)

Vyčištění prostředků

Pokud chcete odstranit prostředky Azure Data Exploreru, použijte příkaz az kusto cluster delete (rozšíření Kusto) nebo az kusto database delete (rozšíření Kusto):

az kusto cluster delete --name "<cluster name>" --resource-group "<resource group name>"
az kusto database delete --cluster-name "<cluster name>" --database-name "<database name>" --resource-group "<resource group name>"

Cluster a databázi můžete také odstranit prostřednictvím webu Azure Portal. Další informace najdete v tématu Odstranění clusteru Azure Data Exploreru a odstranění databáze v Azure Data Exploreru.

Ladění konektoru jímky Kafka

Vylaďte konektor jímky Kafka tak, aby fungoval se zásadami dávkování příjmu dat:

  • Vylaďte limit velikosti jímky flush.size.bytes Kafka od 1 MB, což se zvyšuje o 10 MB nebo 100 MB.
  • Při použití jímky Kafka se data agregují dvakrát. Data na straně konektoru se agregují podle nastavení vyprazdňování a na straně služby podle zásad dávkování. Pokud je doba dávkování příliš krátká, aby data nemohla být ingestována konektorem i službou, je nutné zvýšit dobu dávkování. Nastavte velikost dávkování na 1 GB a podle potřeby zvyšte nebo snižte o 100 MB. Pokud je například velikost vyprázdnění 1 MB a velikost zásad dávkování je 100 MB, konektor jímky Kafka agreguje data do dávky 100 MB. Tato dávka se pak ingestuje službou. Pokud je doba dávkování zásad 20 sekund a konektor jímky Kafka vyprázdní 50 MB během 20sekundového období, služba ingestuje dávku o velikosti 50 MB.
  • Škálování můžete provést přidáním instancí a oddílů Kafka. Zvyšte tasks.max počet oddílů. Pokud máte dostatek dat k vytvoření objektu blob velikosti flush.size.bytes nastavení, vytvořte oddíl. Pokud je objekt blob menší, dávka se zpracuje, když dosáhne časového limitu, takže oddíl neobdrží dostatečnou propustnost. Velký počet oddílů znamená větší režii na zpracování.