Table-API en SQL in Apache Flink®-clusters op HDInsight op AKS
Belangrijk
Azure HDInsight op AKS is op 31 januari 2025 buiten gebruik gesteld. Ontdek meer over met deze aankondiging.
U moet uw workloads migreren naar Microsoft Fabric- of een gelijkwaardig Azure-product om plotselinge beëindiging van uw workloads te voorkomen.
Belangrijk
Deze functie is momenteel beschikbaar als preview-versie. De aanvullende gebruiksvoorwaarden voor Microsoft Azure Previews meer juridische voorwaarden bevatten die van toepassing zijn op Azure-functies die bèta, in preview of anderszins nog niet in algemene beschikbaarheid zijn vrijgegeven. Zie Azure HDInsight op AKS previewinformatievoor meer informatie over deze specifieke preview. Voor vragen of suggesties voor functies dient u een aanvraag in op AskHDInsight- met de details en volgt u ons voor meer updates over Azure HDInsight Community-.
Apache Flink bevat twee relationele API's, de Table-API en SQL, voor geïntegreerde stream- en batchverwerking. De Table-API is een met taal geïntegreerde query-API waarmee u query's kunt samenstellen van relationele operators, zoals selectie, filter en join intuïtief. De SQL-ondersteuning van Flink is gebaseerd op Apache Calcite, waarmee de SQL-standaard wordt geïmplementeerd.
De Table-API en SQL-interfaces kunnen naadloos worden geïntegreerd met elkaar en de DataStream-API van Flink. U kunt eenvoudig schakelen tussen alle API's en bibliotheken, die hierop zijn gebaseerd.
Apache Flink SQL
Net als andere SQL-engines werken Flink-query's op tabellen. Het verschilt van een traditionele database omdat Flink geen statische data lokaal beheert; in plaats daarvan worden de query's continu uitgevoerd over externe tabellen.
Flink pijplijnen voor gegevensverwerking beginnen met brontabellen en eindigen met sinktabellen. Brontabellen produceren rijen die worden verwerkt tijdens de uitvoering van de query; dit zijn de tabellen waarnaar wordt verwezen in het FROM deel van een query. Connectors kunnen van het type HDInsight Kafka, HDInsight HBase, Azure Event Hubs, databases, bestandssysteem of een ander systeem zijn waarvan de connector zich in het klassepad bevindt.
Flink SQL Client gebruiken in HDInsight op AKS-clusters
Raadpleeg dit artikel over het gebruik van CLI vanuit Secure Shell- in Azure Portal. Hier volgen enkele snelle voorbeelden van hoe u aan de slag kunt gaan.
De SQL-client starten
./bin/sql-client.sh
Een initialisatie-SQL-bestand doorgeven om samen met sql-client uit te voeren
./sql-client.sh -i /path/to/init_file.sql
Een configuratie instellen in sql-client
SET execution.runtime-mode = streaming; SET sql-client.execution.result-mode = table; SET sql-client.execution.max-table-result.rows = 10000;
SQL DDL
Flink SQL ondersteunt de volgende CREATE-instructies
- CREATE TABLE
- DATABASE MAKEN
- CATALOGUS MAKEN
Hieronder volgt een voorbeeld van de syntaxis voor het definiëren van een brontabel met behulp van de jdbc-connector voor verbinding met MSSQL, met id, naam als kolommen in een CREATE TABLE-INSTRUCTIE
CREATE TABLE student_information (
id BIGINT,
name STRING,
address STRING,
grade STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:sqlserver://servername.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
'table-name' = 'students',
'username' = 'username',
'password' = 'password'
);
MAAK DATABASE AAN:
CREATE DATABASE students;
CREATE CATALOG:
CREATE CATALOG myhive WITH ('type'='hive');
U kunt doorlopende query's uitvoeren boven op deze tabellen
SELECT id,
COUNT(*) as student_count
FROM student_information
GROUP BY grade;
Schrijf naar sinktabel uit brontabel:
INSERT INTO grade_counts
SELECT id,
COUNT(*) as student_count
FROM student_information
GROUP BY grade;
Afhankelijkheden toevoegen
JAR-instructies worden gebruikt om gebruikers-JAR-bestanden toe te voegen aan het klassepad of gebruikers-JAR's te verwijderen uit het klassepad of om toegevoegde JAR's weer te geven in het klassepad in de runtime.
Flink SQL ondersteunt de volgende JAR-instructies:
- JAR TOEVOEGEN
- JARS WEERGEVEN
- JAR VERWIJDEREN
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.
Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.
Flink SQL> SHOW JARS;
+----------------------------+
| jars |
+----------------------------+
| /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+
Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.
Hive Metastore van Apache Flink®-clusters in HDInsight op AKS
Catalogi bieden metagegevens, zoals databases, tabellen, partities, weergaven en functies en informatie die nodig is voor toegang tot gegevens die zijn opgeslagen in een database of andere externe systemen.
In HDInsight op AKS ondersteunen we twee catalogusopties:
GenericInMemoryCatalog
De GenericInMemoryCatalog is een in-memory implementatie van een catalogus. Alle objecten zijn alleen beschikbaar voor de levensduur van de SQL-sessie.
HiveCatalog-
De HiveCatalog- dient twee doeleinden; als permanente opslag voor pure Flink-metagegevens en als interface voor het lezen en schrijven van bestaande Hive-metagegevens.
Notitie
HDInsight op AKS-clusters wordt geleverd met een geïntegreerde optie van Hive Metastore voor Apache Flink. U kunt tijdens het maken van cluster kiezen voor Hive Metastore
Flink Databases maken en registreren bij catalogi
U kunt dit artikel raadplegen over het gebruik van CLI en aan de slag met Flink SQL Client vanuit Secure Shell in Azure Portal.
sql-client.sh
sessie startenDefault_catalog is de standaardcatalogus in het geheugen
Laten we nu de standaarddatabase van de catalogus in het geheugen controleren
Laten we Hive Catalog van versie 3.1.2 maken en gebruiken
CREATE CATALOG myhive WITH ('type'='hive'); USE CATALOG myhive;
Notitie
HDInsight op AKS ondersteunt Hive 3.1.2 en Hadoop 3.3.2. De
hive-conf-dir
is ingesteld op locatie/opt/hive-conf
Laten we de Database in de hive-catalogus maken en deze standaard instellen voor de sessie (tenzij gewijzigd).
Hive-tabellen maken en registreren bij Hive-catalogus
Volg de instructies voor Het maken en registreren van Flink Databases voor Catalog
Laten we Flink Table van connectortype Hive maken zonder partitie
CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
Gegevens invoegen in hive_table
INSERT INTO hive_table SELECT 2, '10'; INSERT INTO hive_table SELECT 3, '20';
Gegevens lezen uit hive_table
Flink SQL> SELECT * FROM hive_table; 2023-07-24 09:46:22,225 INFO org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3 +----+-------------+--------------------------------+ | op | x | days | +----+-------------+--------------------------------+ | +I | 3 | 20 | | +I | 2 | 10 | | +I | 1 | 5 | +----+-------------+--------------------------------+ Received a total of 3 rows
Notitie
Hive Warehouse Directory bevindt zich in de aangewezen container van het opslagaccount dat is gekozen tijdens het maken van het Apache Flink-cluster, vindt u in directory hive/warehouse/
Hiermee kunt u Flink Table van verbindingslijntype Hive maken met Partition
CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
Belangrijk
Er is een bekende beperking in Apache Flink. De laatste 'n' kolommen worden gekozen voor partities, ongeacht de door de gebruiker gedefinieerde partitiekolom. FLINK-32596 De partitiesleutel is onjuist wanneer u Flink dialect gebruikt om een Hive-tabel te maken.
Referentie
- Apache Flink Table API & SQL
- Apache, Apache Flink, Flink en bijbehorende opensource-projectnamen zijn handelsmerken van de Apache Software Foundation (ASF).