Dela via


Hämta data från Kafka

Apache Kafka är en distribuerad strömningsplattform för att skapa strömmande datapipelines i realtid som på ett tillförlitligt sätt flyttar data mellan system eller program. Kafka Connect är ett verktyg för skalbar och tillförlitlig strömning av data mellan Apache Kafka och andra datasystem. Kusto Kafka-mottagaren fungerar som anslutningsapp från Kafka och kräver inte att du använder kod. Ladda ned mottagaranslutningsburken från Git-lagringsplatsen eller Confluent Connector Hub.

Den här artikeln visar hur du matar in data med Kafka med hjälp av en fristående Docker-konfiguration för att förenkla konfigurationen av Kafka-klustret och Kafka-anslutningsklustret.

Mer information finns i Git-lagringsplatsen för anslutningsappen och versionsspecifika.

Förutsättningar

Skapa ett huvudnamn för Microsoft Entra-tjänsten

Microsoft Entra-tjänstens huvudnamn kan skapas via Azure Portal eller programatiskt, som i följande exempel.

Tjänstens huvudnamn är den identitet som används av anslutningsappen för att skriva data i tabellen i Kusto. Du beviljar behörigheter för tjänstens huvudnamn för åtkomst till Kusto-resurser.

  1. Logga in på din Azure-prenumeration via Azure CLI. Autentisera sedan i webbläsaren.

    az login
    
  2. Välj den prenumeration som ska vara värd för huvudkontot. Det här steget behövs när du har flera prenumerationer.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Skapa tjänstens huvudnamn. I det här exemplet kallas my-service-principaltjänstens huvudnamn .

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Från de returnerade JSON-data kopierar du appId, passwordoch tenant för framtida användning.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

Du har skapat ditt Microsoft Entra-program och tjänstens huvudnamn.

Skapa en måltabell

  1. Skapa en tabell med Storms namnet från frågemiljön med följande kommando:

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    
  2. Skapa motsvarande tabellmappning Storms_CSV_Mapping för inmatade data med hjälp av följande kommando:

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  3. Skapa en inmatningsbatchprincip i tabellen för konfigurerbar svarstid för inmatning i kö.

    Dricks

    Inmatningsbatchprincipen är en prestandaoptimerare och innehåller tre parametrar. Det första villkoret som uppfylls utlöser inmatning i tabellen.

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  4. Använd tjänstens huvudnamn från Skapa ett Microsoft Entra-tjänsthuvudnamn för att bevilja behörighet att arbeta med databasen.

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

Kör labbet

Följande labb är utformat för att ge dig erfarenhet av att börja skapa data, konfigurera Kafka-anslutningsappen och strömma dessa data. Du kan sedan titta på inmatade data.

Klona git-lagringsplatsen

Klona labbets git-lagringsplats.

  1. Skapa en lokal katalog på datorn.

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. Klona lagringsplatsen.

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

Innehållet i den klonade lagringsplatsen

Kör följande kommando för att visa innehållet i den klonade lagringsplatsen:

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

Det här resultatet av den här sökningen är:

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

Granska filerna på den klonade lagringsplatsen

I följande avsnitt beskrivs de viktiga delarna av filerna i filträdet ovan.

adx-sink-config.json

Den här filen innehåller kusto-mottagaregenskaper där du uppdaterar specifik konfigurationsinformation:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "<ingestion URI per prerequisites>",
        "kusto.query.url": "<query URI per prerequisites>",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

Ersätt värdena för följande attribut enligt konfigurationen: aad.auth.authority, aad.auth.appid, , kusto.tables.topics.mapping aad.auth.appkey(databasnamnet), kusto.ingestion.urloch kusto.query.url.

Anslutningsprogram – Dockerfile

Den här filen har kommandon för att generera docker-avbildningen för anslutningsinstansen. Den innehåller nedladdningen av anslutningsappen från git-lagringsplatsens versionskatalog.

Storm-events-producer-katalog

Den här katalogen har ett Go-program som läser en lokal "StormEvents.csv"-fil och publicerar data till ett Kafka-ämne.

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

Starta containrarna

  1. Starta containrarna i en terminal:

    docker-compose up
    

    Producentprogrammet börjar skicka händelser till ämnet storm-events . Du bör se loggar som liknar följande loggar:

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  |
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. Kontrollera loggarna genom att köra följande kommando i en separat terminal:

    docker-compose logs -f | grep kusto-connect
    

Starta anslutningsappen

Använd ett Kafka Connect REST-anrop för att starta anslutningsappen.

  1. Starta mottagaraktiviteten i en separat terminal med följande kommando:

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. Kontrollera statusen genom att köra följande kommando i en separat terminal:

    curl http://localhost:8083/connectors/storm/status
    

Anslutningsappen börjar köa inmatningsprocesser.

Kommentar

Om du har problem med logganslutningsappen skapar du ett problem.

Fråga efter och granska data

Bekräfta datainmatning

  1. När data har anlänt till Storms tabellen bekräftar du dataöverföringen genom att kontrollera radantalet:

    Storms 
    | count
    
  2. Bekräfta att det inte finns några fel i inmatningsprocessen:

    .show ingestion failures
    

    När du ser data kan du prova några frågor.

Fråga efter data

  1. Kör följande fråga för att se alla poster:

    Storms
    | take 10
    
  2. Använd where och project för att filtrera specifika data:

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. Använd operatorn summarize:

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    Skärmbild av anslutna kafka-frågekolumndiagramresultat.

Fler frågeexempel och vägledning finns i Skriva frågor i KQL och Kusto-frågespråk dokumentation.

Reset

Gör följande för att återställa:

  1. Stoppa containrarna (docker-compose down -v)
  2. Ta bort (drop table Storms)
  3. Storms Återskapa tabellen
  4. Återskapa tabellmappning
  5. Starta om containrar (docker-compose up)

Rensa resurser

Rensa de objekt som skapats genom att navigera till arbetsytan där de skapades.

  1. Hovra över databasen på arbetsytan och välj menyn Mer [...] > Ta bort.

  2. Välj Ta bort. Du kan inte återställa borttagna objekt.

Justera Kafka Sink-anslutningsappen

Justera Kafka Sink-anslutningsappen så att den fungerar med inmatningsbatchprincipen:

  • Justera storleksgränsen för Kafka-mottagare flush.size.bytes från 1 MB och öka med steg om 10 MB eller 100 MB.
  • När du använder Kafka-mottagare aggregeras data två gånger. På anslutningsappen aggregeras data enligt inställningarna för tömning och på tjänstsidan enligt batchprincipen. Om batchbearbetningstiden är för kort så att data inte kan matas in av både anslutningsappen och tjänsten, måste batchtiden ökas. Ange batchstorleken till 1 GB och öka eller minska med 100 MB efter behov. Om tömningsstorleken till exempel är 1 MB och batchprincipstorleken är 100 MB, aggregerar Kafka Sink-anslutningsappen data till en batch på 100 MB. Den batchen matas sedan in av tjänsten. Om batchprinciptiden är 20 sekunder och Kafka Sink-anslutningen töms 50 MB under en 20-sekundersperiod matar tjänsten in en 50 MB-batch.
  • Du kan skala genom att lägga till instanser och Kafka-partitioner. Öka tasks.max till antalet partitioner. Skapa en partition om du har tillräckligt med data för att skapa en blob med inställningens flush.size.bytes storlek. Om blobben är mindre bearbetas batchen när den når tidsgränsen, så partitionen får inte tillräckligt med dataflöde. Ett stort antal partitioner innebär mer bearbetningskostnader.