Tabell-API och SQL i Apache Flink-kluster® i HDInsight på AKS
Kommentar
Vi drar tillbaka Azure HDInsight på AKS den 31 januari 2025. Före den 31 januari 2025 måste du migrera dina arbetsbelastningar till Microsoft Fabric eller en motsvarande Azure-produkt för att undvika plötsliga uppsägningar av dina arbetsbelastningar. Återstående kluster i din prenumeration stoppas och tas bort från värden.
Endast grundläggande stöd kommer att vara tillgängligt fram till datumet för pensionering.
Viktigt!
Den här funktionen finns i förhandsgranskning. De kompletterande användningsvillkoren för Förhandsversioner av Microsoft Azure innehåller fler juridiska villkor som gäller för Azure-funktioner som är i betaversion, förhandsversion eller på annat sätt ännu inte har släppts i allmän tillgänglighet. Information om den här specifika förhandsversionen finns i Azure HDInsight på AKS-förhandsversionsinformation. Om du vill ha frågor eller funktionsförslag skickar du en begäran på AskHDInsight med informationen och följer oss för fler uppdateringar i Azure HDInsight Community.
Apache Flink har två relations-API:er – tabell-API:et och SQL – för enhetlig dataström och batchbearbetning. Tabell-API:et är ett språkintegrerad fråge-API som tillåter sammansättningen av frågor från relationsoperatorer, till exempel val, filter och koppling intuitivt. Flinks SQL-stöd baseras på Apache Calcite, som implementerar SQL-standarden.
Tabell-API:et och SQL-gränssnitten integreras sömlöst med varandra och Flinks DataStream-API. Du kan enkelt växla mellan alla API:er och bibliotek, som bygger på dem.
Apache Flink SQL
Liksom andra SQL-motorer fungerar Flink-frågor ovanpå tabeller. Den skiljer sig från en traditionell databas eftersom Flink inte hanterar vilande data lokalt. i stället fungerar frågorna kontinuerligt över externa tabeller.
Flink-databearbetningspipelines börjar med källtabeller och slutar med mottagartabeller. Källtabeller skapar rader som körs under frågekörningen. de är tabellerna som refereras i FROM-satsen för en fråga. Anslutningsappar kan vara av typen HDInsight Kafka, HDInsight HBase, Azure Event Hubs, databaser, filsystem eller andra system vars anslutningsapp ligger i klassökvägen.
Använda Flink SQL Client i HDInsight i AKS-kluster
Du kan läsa den här artikeln om hur du använder CLI från Secure Shell på Azure Portal. Här följer några snabba exempel på hur du kommer igång.
Starta SQL-klienten
./bin/sql-client.sh
Skicka en sql-initieringsfil som ska köras tillsammans med sql-client
./sql-client.sh -i /path/to/init_file.sql
Så här anger du en konfiguration i sql-client
SET execution.runtime-mode = streaming; SET sql-client.execution.result-mode = table; SET sql-client.execution.max-table-result.rows = 10000;
SQL DDL
Flink SQL stöder följande CREATE-instruktioner
- CREATE TABLE
- SKAPA DATABAS
- SKAPA KATALOG
Följande är en exempelsyntax för att definiera en källtabell med jdbc-anslutningsprogrammet för att ansluta till MSSQL, med ID, namn som kolumner i en CREATE TABLE-instruktion
CREATE TABLE student_information (
id BIGINT,
name STRING,
address STRING,
grade STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:sqlserver://servername.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
'table-name' = 'students',
'username' = 'username',
'password' = 'password'
);
SKAPA DATABAS :
CREATE DATABASE students;
SKAPA KATALOG:
CREATE CATALOG myhive WITH ('type'='hive');
Du kan köra kontinuerliga frågor ovanpå dessa tabeller
SELECT id,
COUNT(*) as student_count
FROM student_information
GROUP BY grade;
Skriv ut till mottagartabellen från källtabellen:
INSERT INTO grade_counts
SELECT id,
COUNT(*) as student_count
FROM student_information
GROUP BY grade;
Lägga till beroenden
JAR-uttryck används för att lägga till användarburkar i klassökvägen eller ta bort användarburkar från klassökvägen eller visa tillagda jars i klassökvägen i körningen.
Flink SQL stöder följande JAR-instruktioner:
- ADD JAR
- VISA JAR-FLÖDEN
- TA BORT JAR
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.
Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.
Flink SQL> SHOW JARS;
+----------------------------+
| jars |
+----------------------------+
| /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+
Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.
Hive-metaarkiv i Apache Flink-kluster® i HDInsight på AKS
Kataloger tillhandahåller metadata, till exempel databaser, tabeller, partitioner, vyer och funktioner och information som behövs för att komma åt data som lagras i en databas eller andra externa system.
I HDInsight på AKS stöder Flink två katalogalternativ:
GenericInMemoryCatalog
GenericInMemoryCatalog är en minnesintern implementering av en katalog. Alla objekt är endast tillgängliga under sql-sessionens livslängd.
HiveCatalog
HiveCatalog har två syften: som beständig lagring för rena Flink-metadata och som ett gränssnitt för att läsa och skriva befintliga Hive-metadata.
Kommentar
HDInsight på AKS-kluster levereras med ett integrerat alternativ för Hive Metastore för Apache Flink. Du kan välja Hive Metastore när klustret skapas
Skapa och registrera Flink-databaser i kataloger
Du kan läsa den här artikeln om hur du använder CLI och kommer igång med Flink SQL Client från Secure Shell på Azure Portal.
Starta
sql-client.sh
sessionDefault_catalog är standardkatalogen i minnet
Låt oss nu kontrollera standarddatabasen för minnesintern katalog
Låt oss skapa Hive-katalogen av version 3.1.2 och använda den
CREATE CATALOG myhive WITH ('type'='hive'); USE CATALOG myhive;
Kommentar
HDInsight på AKS stöder Hive 3.1.2 och Hadoop 3.3.2.
hive-conf-dir
Är inställt på plats/opt/hive-conf
Låt oss skapa databasen i hive-katalogen och göra den till standard för sessionen (om den inte har ändrats).
Skapa och registrera Hive-tabeller i Hive-katalogen
Följ anvisningarna om hur du skapar och registrerar Flink-databaser till katalog
Låt oss skapa Flink-tabell av anslutningstyp Hive utan partition
CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
Infoga data i hive_table
INSERT INTO hive_table SELECT 2, '10'; INSERT INTO hive_table SELECT 3, '20';
Läsa data från hive_table
Flink SQL> SELECT * FROM hive_table; 2023-07-24 09:46:22,225 INFO org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3 +----+-------------+--------------------------------+ | op | x | days | +----+-------------+--------------------------------+ | +I | 3 | 20 | | +I | 2 | 10 | | +I | 1 | 5 | +----+-------------+--------------------------------+ Received a total of 3 rows
Kommentar
Hive Warehouse Directory finns i den avsedda containern för lagringskontot som valdes när Apache Flink-klustret skapades, finns i katalogen hive/warehouse/
Låter skapa Flink-tabell av anslutningstypsdatafil med partition
CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
Viktigt!
Det finns en känd begränsning i Apache Flink. De sista n-kolumnerna väljs för partitioner, oavsett användardefinierad partitionskolumn. FLINK-32596 Partitionsnyckeln är fel när du använder Flink-dialekt för att skapa Hive-tabell.
Referens
- Apache Flink Table API & SQL
- Apache, Apache Flink, Flink och associerade öppen källkod projektnamn är varumärken som tillhör Apache Software Foundation (ASF).