Ingestování dat z Apache Kafka do Azure Data Exploreru
Apache Kafka je distribuovaná platforma streamování pro vytváření datových kanálů streamování v reálném čase, které spolehlivě přesouvají data mezi systémy nebo aplikacemi. Kafka Connect je nástroj pro škálovatelné a spolehlivé streamování dat mezi Apache Kafka a dalšími datovými systémy. Jímka Kusto Kafka slouží jako konektor ze systému Kafka a nevyžaduje použití kódu. Stáhněte soubor JAR konektoru jímky z úložiště Git nebo centra konektorů Confluent.
Tento článek ukazuje, jak ingestovat data pomocí systému Kafka pomocí samostatného nastavení Dockeru pro zjednodušení nastavení clusteru Kafka a clusteru konektoru Kafka.
Další informace najdete v úložišti Git a konkrétních verzích konektoru.
Požadavky
- Předplatné Azure. Vytvořte bezplatný účet Azure.
- Cluster a databáze Azure Data Exploreru s výchozími zásadami ukládání do mezipaměti a uchovávání informací.
- Rozhraní příkazového řádku Azure.
- Docker a Docker Compose.
Vytvoření instančního objektu Microsoft Entra
Instanční objekt Microsoft Entra je možné vytvořit prostřednictvím webu Azure Portal nebo programově, jako v následujícím příkladu.
Tento instanční objekt je identita, kterou konektor používá k zápisu dat do tabulky v Kusto. Udělíte oprávnění pro tento instanční objekt pro přístup k prostředkům Kusto.
Přihlaste se ke svému předplatnému Azure prostřednictvím Azure CLI. Pak se ověřte v prohlížeči.
az login
Zvolte předplatné, které má být hostitelem objektu zabezpečení. Tento krok je potřeba v případě, že máte více předplatných.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Vytvořte instanční objekt. V tomto příkladu se instanční objekt nazývá
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Z vrácených dat JSON zkopírujte
appId
password
tenant
a pro budoucí použití.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Vytvořili jste aplikaci Microsoft Entra a instanční objekt.
Vytvoření cílové tabulky
V prostředí dotazu vytvořte tabulku s názvem
Storms
pomocí následujícího příkazu:.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
Pomocí následujícího příkazu vytvořte odpovídající mapování
Storms_CSV_Mapping
tabulky pro ingestované data:.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
Vytvořte zásadu dávkování příjmu dat v tabulce pro konfigurovatelnou latenci příjmu dat ve frontě.
Tip
Zásady dávkování příjmu dat jsou optimalizátor výkonu a obsahují tři parametry. První podmínka, která splňuje, aktivuje příjem dat do tabulky.
.alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
Pomocí instančního objektu z vytvoření instančního objektu Microsoft Entra udělte oprávnění pro práci s databází.
.add database YOUR_DATABASE_NAME admins ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
Spuštění testovacího prostředí
Následující cvičení je navržené tak, aby vám poskytlo prostředí pro vytváření dat, nastavení konektoru Kafka a streamování těchto dat. Pak se můžete podívat na ingestované data.
Klonování úložiště Git
Naklonujte úložiště Git testovacího prostředí.
Na počítači vytvořte místní adresář.
mkdir ~/kafka-kusto-hol cd ~/kafka-kusto-hol
Naklonujte úložiště.
cd ~/kafka-kusto-hol git clone https://github.com/Azure/azure-kusto-labs cd azure-kusto-labs/kafka-integration/dockerized-quickstart
Obsah klonovaného úložiště
Spuštěním následujícího příkazu zobrazte seznam obsahu klonovaného úložiště:
cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree
Tento výsledek hledání:
├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│ └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
├── Dockerfile
├── StormEvents.csv
├── go.mod
├── go.sum
├── kafka
│ └── kafka.go
└── main.go
Kontrola souborů v naklonovaném úložišti
Následující části popisují důležité části souborů ve stromu souborů.
adx-sink-config.json
Tento soubor obsahuje soubor vlastností jímky Kusto, kde aktualizujete konkrétní podrobnosti konfigurace:
{
"name": "storm",
"config": {
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"flush.size.bytes": 10000,
"flush.interval.ms": 10000,
"tasks.max": 1,
"topics": "storm-events",
"kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
"aad.auth.authority": "<enter tenant ID>",
"aad.auth.appid": "<enter application ID>",
"aad.auth.appkey": "<enter client secret>",
"kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
"kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Nahraďte hodnoty následujících atributů podle nastavení: , , , kusto.tables.topics.mapping
(název databáze) kusto.ingestion.url
a kusto.query.url
. aad.auth.appkey
aad.auth.appid
aad.auth.authority
Konektor – Dockerfile
Tento soubor obsahuje příkazy pro vygenerování image Dockeru pro instanci konektoru. Zahrnuje stažení konektoru z adresáře verze úložiště Git.
Adresář Storm-events-producer
Tento adresář má program Go, který čte místní soubor "StormEvents.csv" a publikuje data do tématu Kafka.
docker-compose.yaml
version: "2"
services:
zookeeper:
image: debezium/zookeeper:1.2
ports:
- 2181:2181
kafka:
image: debezium/kafka:1.2
ports:
- 9092:9092
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
kusto-connect:
build:
context: ./connector
args:
KUSTO_KAFKA_SINK_VERSION: 1.0.1
ports:
- 8083:8083
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=adx
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
events-producer:
build:
context: ./storm-events-producer
links:
- kafka
depends_on:
- kafka
environment:
- KAFKA_BOOTSTRAP_SERVER=kafka:9092
- KAFKA_TOPIC=storm-events
- SOURCE_FILE=StormEvents.csv
Spuštění kontejnerů
V terminálu spusťte kontejnery:
docker-compose up
Aplikace producenta začne odesílat události do
storm-events
tématu. Měly by se zobrazit protokoly podobné následujícím protokolům:.... events-producer_1 | sent message to partition 0 offset 0 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public events-producer_1 | events-producer_1 | sent message to partition 0 offset 1 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer ....
Pokud chcete zkontrolovat protokoly, spusťte v samostatném terminálu následující příkaz:
docker-compose logs -f | grep kusto-connect
Spuštění konektoru
Ke spuštění konektoru použijte volání REST služby Kafka Connect.
V samostatném terminálu spusťte úlohu jímky pomocí následujícího příkazu:
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
Pokud chcete zkontrolovat stav, spusťte v samostatném terminálu následující příkaz:
curl http://localhost:8083/connectors/storm/status
Konektor začne zařadovat procesy příjmu dat do fronty.
Poznámka:
Pokud máte problémy s konektorem protokolu, vytvořte problém.
Spravovaná identita
Ve výchozím nastavení konektor Kafka používá metodu aplikace pro ověřování během příjmu dat. Ověření pomocí spravované identity:
Přiřaďte cluster spravované identitě a udělte účtu úložiště oprávnění ke čtení. Další informace najdete v tématu Ingestování dat pomocí ověřování spravované identity.
V souboru adx-sink-config.json nastavte
managed_identity
aad.auth.strategy
a ujistěte se, žeaad.auth.appid
je nastavené NA ID klienta spravované identity (aplikace).Místo instančního objektu Microsoft Entra použijte token služby metadat privátní instance.
Poznámka:
Při použití spravované identity appId
a tenant
jsou vyvolány z kontextu webu volání a password
nejsou potřeba.
Dotazování a kontrola dat
Potvrzení příjmu dat
Jakmile data dorazí do
Storms
tabulky, potvrďte přenos dat tak, že zkontrolujete počet řádků:Storms | count
Ověřte, že v procesu příjmu dat nedošlo k žádným selháním:
.show ingestion failures
Jakmile uvidíte data, vyzkoušejte několik dotazů.
Vytváření dotazů na data
Pokud chcete zobrazit všechny záznamy, spusťte následující dotaz:
Storms | take 10
Použití
where
aproject
filtrování konkrétních dat:Storms | where EventType == 'Drought' and State == 'TEXAS' | project StartTime, EndTime, Source, EventId
summarize
Použijte operátor:Storms | summarize event_count=count() by State | where event_count > 10 | project State, event_count | render columnchart
Další příklady a pokyny k dotazům najdete v dokumentaci k psaní dotazů v KQL a dotazovací jazyk Kusto.
Reset
Pokud chcete resetovat, proveďte následující kroky:
- Zastavení kontejnerů (
docker-compose down -v
) - Odstranit (
drop table Storms
) - Opětovné vytvoření
Storms
tabulky - Opětovné vytvoření mapování tabulek
- Restartování kontejnerů (
docker-compose up
)
Vyčištění prostředků
Pokud chcete odstranit prostředky Azure Data Exploreru, použijte příkaz az kusto cluster delete (rozšíření Kusto) nebo az kusto database delete (rozšíření Kusto):
az kusto cluster delete --name "<cluster name>" --resource-group "<resource group name>"
az kusto database delete --cluster-name "<cluster name>" --database-name "<database name>" --resource-group "<resource group name>"
Cluster a databázi můžete také odstranit prostřednictvím webu Azure Portal. Další informace najdete v tématu Odstranění clusteru Azure Data Exploreru a odstranění databáze v Azure Data Exploreru.
Ladění konektoru jímky Kafka
Vylaďte konektor jímky Kafka tak, aby fungoval se zásadami dávkování příjmu dat:
- Vylaďte limit velikosti jímky
flush.size.bytes
Kafka od 1 MB, což se zvyšuje o 10 MB nebo 100 MB. - Při použití jímky Kafka se data agregují dvakrát. Data na straně konektoru se agregují podle nastavení vyprazdňování a na straně služby podle zásad dávkování. Pokud je doba dávkování příliš krátká, aby data nemohla být ingestována konektorem i službou, je nutné zvýšit dobu dávkování. Nastavte velikost dávkování na 1 GB a podle potřeby zvyšte nebo snižte o 100 MB. Pokud je například velikost vyprázdnění 1 MB a velikost zásad dávkování je 100 MB, konektor jímky Kafka agreguje data do dávky 100 MB. Tato dávka se pak ingestuje službou. Pokud je doba dávkování zásad 20 sekund a konektor jímky Kafka vyprázdní 50 MB během 20sekundového období, služba ingestuje dávku o velikosti 50 MB.
- Škálování můžete provést přidáním instancí a oddílů Kafka. Zvyšte
tasks.max
počet oddílů. Pokud máte dostatek dat k vytvoření objektu blob velikostiflush.size.bytes
nastavení, vytvořte oddíl. Pokud je objekt blob menší, dávka se zpracuje, když dosáhne časového limitu, takže oddíl neobdrží dostatečnou propustnost. Velký počet oddílů znamená větší režii na zpracování.
Související obsah
- Přečtěte si další informace o architektuře velkých objemů dat.
- Naučte se ingestovat ukázková data ve formátu JSON do Azure Data Exploreru.
- Další informace o testovacích prostředích Kafka: