Hämta data från Kafka
Apache Kafka är en distribuerad strömningsplattform för att skapa strömmande datapipelines i realtid som på ett tillförlitligt sätt flyttar data mellan system eller program. Kafka Connect är ett verktyg för skalbar och tillförlitlig strömning av data mellan Apache Kafka och andra datasystem. Kusto Kafka-mottagaren fungerar som anslutningsapp från Kafka och kräver inte att du använder kod. Ladda ned mottagaranslutningsburken från Git-lagringsplatsen eller Confluent Connector Hub.
Den här artikeln visar hur du matar in data med Kafka med hjälp av en fristående Docker-konfiguration för att förenkla konfigurationen av Kafka-klustret och Kafka-anslutningsklustret.
Mer information finns i Git-lagringsplatsen för anslutningsappen och versionsspecifika.
Förutsättningar
- En Azure-prenumeration. Skapa ett kostnadsfritt Azure-konto.
- En KQL-databas i Microsoft Fabric.
- Din databasinmatnings-URI och fråge-URI som ska användas i JSON-konfigurationsfilen. Mer information finns i Kopiera URI.
- Azure CLI.
- Docker och Docker Compose.
Skapa ett huvudnamn för Microsoft Entra-tjänsten
Microsoft Entra-tjänstens huvudnamn kan skapas via Azure Portal eller programatiskt, som i följande exempel.
Tjänstens huvudnamn är den identitet som används av anslutningsappen för att skriva data i tabellen i Kusto. Du beviljar behörigheter för tjänstens huvudnamn för åtkomst till Kusto-resurser.
Logga in på din Azure-prenumeration via Azure CLI. Autentisera sedan i webbläsaren.
az login
Välj den prenumeration som ska vara värd för huvudkontot. Det här steget behövs när du har flera prenumerationer.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Skapa tjänstens huvudnamn. I det här exemplet kallas
my-service-principal
tjänstens huvudnamn .az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Från de returnerade JSON-data kopierar du
appId
,password
ochtenant
för framtida användning.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Du har skapat ditt Microsoft Entra-program och tjänstens huvudnamn.
Skapa en måltabell
Skapa en tabell med
Storms
namnet från frågemiljön med följande kommando:.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
Skapa motsvarande tabellmappning
Storms_CSV_Mapping
för inmatade data med hjälp av följande kommando:.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
Skapa en inmatningsbatchprincip i tabellen för konfigurerbar svarstid för inmatning i kö.
Dricks
Inmatningsbatchprincipen är en prestandaoptimerare och innehåller tre parametrar. Det första villkoret som uppfylls utlöser inmatning i tabellen.
.alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
Använd tjänstens huvudnamn från Skapa ett Microsoft Entra-tjänsthuvudnamn för att bevilja behörighet att arbeta med databasen.
.add database YOUR_DATABASE_NAME admins ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
Kör labbet
Följande labb är utformat för att ge dig erfarenhet av att börja skapa data, konfigurera Kafka-anslutningsappen och strömma dessa data. Du kan sedan titta på inmatade data.
Klona git-lagringsplatsen
Klona labbets git-lagringsplats.
Skapa en lokal katalog på datorn.
mkdir ~/kafka-kusto-hol cd ~/kafka-kusto-hol
Klona lagringsplatsen.
cd ~/kafka-kusto-hol git clone https://github.com/Azure/azure-kusto-labs cd azure-kusto-labs/kafka-integration/dockerized-quickstart
Innehållet i den klonade lagringsplatsen
Kör följande kommando för att visa innehållet i den klonade lagringsplatsen:
cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree
Det här resultatet av den här sökningen är:
├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│ └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
├── Dockerfile
├── StormEvents.csv
├── go.mod
├── go.sum
├── kafka
│ └── kafka.go
└── main.go
Granska filerna på den klonade lagringsplatsen
I följande avsnitt beskrivs de viktiga delarna av filerna i filträdet ovan.
adx-sink-config.json
Den här filen innehåller kusto-mottagaregenskaper där du uppdaterar specifik konfigurationsinformation:
{
"name": "storm",
"config": {
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"flush.size.bytes": 10000,
"flush.interval.ms": 10000,
"tasks.max": 1,
"topics": "storm-events",
"kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
"aad.auth.authority": "<enter tenant ID>",
"aad.auth.appid": "<enter application ID>",
"aad.auth.appkey": "<enter client secret>",
"kusto.ingestion.url": "<ingestion URI per prerequisites>",
"kusto.query.url": "<query URI per prerequisites>",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Ersätt värdena för följande attribut enligt konfigurationen: aad.auth.authority
, aad.auth.appid
, , kusto.tables.topics.mapping
aad.auth.appkey
(databasnamnet), kusto.ingestion.url
och kusto.query.url
.
Anslutningsprogram – Dockerfile
Den här filen har kommandon för att generera docker-avbildningen för anslutningsinstansen. Den innehåller nedladdningen av anslutningsappen från git-lagringsplatsens versionskatalog.
Storm-events-producer-katalog
Den här katalogen har ett Go-program som läser en lokal "StormEvents.csv"-fil och publicerar data till ett Kafka-ämne.
docker-compose.yaml
version: "2"
services:
zookeeper:
image: debezium/zookeeper:1.2
ports:
- 2181:2181
kafka:
image: debezium/kafka:1.2
ports:
- 9092:9092
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
kusto-connect:
build:
context: ./connector
args:
KUSTO_KAFKA_SINK_VERSION: 1.0.1
ports:
- 8083:8083
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=adx
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
events-producer:
build:
context: ./storm-events-producer
links:
- kafka
depends_on:
- kafka
environment:
- KAFKA_BOOTSTRAP_SERVER=kafka:9092
- KAFKA_TOPIC=storm-events
- SOURCE_FILE=StormEvents.csv
Starta containrarna
Starta containrarna i en terminal:
docker-compose up
Producentprogrammet börjar skicka händelser till ämnet
storm-events
. Du bör se loggar som liknar följande loggar:.... events-producer_1 | sent message to partition 0 offset 0 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public events-producer_1 | events-producer_1 | sent message to partition 0 offset 1 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer ....
Kontrollera loggarna genom att köra följande kommando i en separat terminal:
docker-compose logs -f | grep kusto-connect
Starta anslutningsappen
Använd ett Kafka Connect REST-anrop för att starta anslutningsappen.
Starta mottagaraktiviteten i en separat terminal med följande kommando:
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
Kontrollera statusen genom att köra följande kommando i en separat terminal:
curl http://localhost:8083/connectors/storm/status
Anslutningsappen börjar köa inmatningsprocesser.
Kommentar
Om du har problem med logganslutningsappen skapar du ett problem.
Fråga efter och granska data
Bekräfta datainmatning
När data har anlänt till
Storms
tabellen bekräftar du dataöverföringen genom att kontrollera radantalet:Storms | count
Bekräfta att det inte finns några fel i inmatningsprocessen:
.show ingestion failures
När du ser data kan du prova några frågor.
Fråga efter data
Kör följande fråga för att se alla poster:
Storms | take 10
Använd
where
ochproject
för att filtrera specifika data:Storms | where EventType == 'Drought' and State == 'TEXAS' | project StartTime, EndTime, Source, EventId
Använd operatorn
summarize
:Storms | summarize event_count=count() by State | where event_count > 10 | project State, event_count | render columnchart
Fler frågeexempel och vägledning finns i Skriva frågor i KQL och Kusto-frågespråk dokumentation.
Reset
Gör följande för att återställa:
- Stoppa containrarna (
docker-compose down -v
) - Ta bort (
drop table Storms
) Storms
Återskapa tabellen- Återskapa tabellmappning
- Starta om containrar (
docker-compose up
)
Rensa resurser
Rensa de objekt som skapats genom att navigera till arbetsytan där de skapades.
Hovra över databasen på arbetsytan och välj menyn Mer [...] > Ta bort.
Välj Ta bort. Du kan inte återställa borttagna objekt.
Justera Kafka Sink-anslutningsappen
Justera Kafka Sink-anslutningsappen så att den fungerar med inmatningsbatchprincipen:
- Justera storleksgränsen för Kafka-mottagare
flush.size.bytes
från 1 MB och öka med steg om 10 MB eller 100 MB. - När du använder Kafka-mottagare aggregeras data två gånger. På anslutningsappen aggregeras data enligt inställningarna för tömning och på tjänstsidan enligt batchprincipen. Om batchbearbetningstiden är för kort så att data inte kan matas in av både anslutningsappen och tjänsten, måste batchtiden ökas. Ange batchstorleken till 1 GB och öka eller minska med 100 MB efter behov. Om tömningsstorleken till exempel är 1 MB och batchprincipstorleken är 100 MB, aggregerar Kafka Sink-anslutningsappen data till en batch på 100 MB. Den batchen matas sedan in av tjänsten. Om batchprinciptiden är 20 sekunder och Kafka Sink-anslutningen töms 50 MB under en 20-sekundersperiod matar tjänsten in en 50 MB-batch.
- Du kan skala genom att lägga till instanser och Kafka-partitioner. Öka
tasks.max
till antalet partitioner. Skapa en partition om du har tillräckligt med data för att skapa en blob med inställningensflush.size.bytes
storlek. Om blobben är mindre bearbetas batchen när den når tidsgränsen, så partitionen får inte tillräckligt med dataflöde. Ett stort antal partitioner innebär mer bearbetningskostnader.