Tabellen-API und SQL in Apache Flink®-Clustern auf HDInsight on AKS
Hinweis
Azure HDInsight on AKS wird am 31. Januar 2025 eingestellt. Vor dem 31. Januar 2025 müssen Sie Ihre Workloads zu Microsoft Fabric oder einem gleichwertigen Azure-Produkt migrieren, um eine abruptes Beendigung Ihrer Workloads zu vermeiden. Die verbleibenden Cluster in Ihrem Abonnement werden beendet und vom Host entfernt.
Bis zum Einstellungsdatum ist nur grundlegende Unterstützung verfügbar.
Wichtig
Diese Funktion steht derzeit als Vorschau zur Verfügung. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure-Vorschauen enthalten weitere rechtliche Bestimmungen, die für Azure-Features in Betaversionen, in Vorschauversionen oder anderen Versionen gelten, die noch nicht allgemein verfügbar gemacht wurden. Informationen zu dieser spezifischen Vorschau finden Sie unter Informationen zur Vorschau von Azure HDInsight on AKS. Bei Fragen oder Funktionsvorschlägen senden Sie eine Anfrage an AskHDInsight mit den entsprechenden Details, und folgen Sie uns für weitere Updates in der Azure HDInsight-Community.
Apache Flink verfügt über zwei relationale APIs – die Tabellen-API und SQL – für die einheitliche Datenstrom- und Batchverarbeitung. Die Tabellen-API ist eine LINQ-API (Language Integrated Query), die die intuitive Zusammenstellung von Abfragen von relationalen Operatoren wie Auswahl-, Filter- und Verknüpfungsoperator ermöglicht. Die SQL-Unterstützung von Flink basiert auf Apache Calcite, das den SQL-Standard implementiert.
Die Tabellen-API und SQL-Schnittstellen lassen sich nahtlos ineinander und in die DataStream-API von Flink integrieren. Sie können ganz einfach zwischen allen APIs und Bibliotheken wechseln, die darauf aufbauen.
Apache Flink SQL
Wie andere SQL-Engines werden Flink-Abfragen auf der Grundlage von Tabellen ausgeführt. Flink unterscheidet sich von einer herkömmlichen Datenbank, da das Framework keine ruhenden Daten lokal verwaltet. Stattdessen werden die Abfragen kontinuierlich über externe Tabellen ausgeführt.
Flink-Datenverarbeitungspipelines beginnen mit Quelltabellen und enden mit Senkentabellen. Quelltabellen erzeugen Zeilen, die während der Abfrageausführung ausgeführt werden. Es handelt sich dabei um die Tabellen, auf die in der FROM-Klausel einer Abfrage verwiesen wird. Connectors können vom Typ „HDInsight Kafka“, „HDInsight HBase“, „Azure Event Hubs“ bzw. Datenbanken, Dateisysteme oder andere Systeme sein, deren Connector sich im Klassenpfad befindet.
Verwenden des Flink SQL-Clients in HDInsight auf AKS-Clustern
In diesem Artikel erfahren Sie, wie Sie die CLI über Secure Shell im Azure-Portal verwenden. Hier finden Sie einige kurze Beispiele für die ersten Schritte.
So starten Sie den SQL-Client:
./bin/sql-client.sh
So übergeben Sie eine SQL-Initialisierungsdatei, die zusammen mit dem SQL-Client ausgeführt werden soll:
./sql-client.sh -i /path/to/init_file.sql
So legen Sie eine Konfiguration im SQL-Client fest:
SET execution.runtime-mode = streaming; SET sql-client.execution.result-mode = table; SET sql-client.execution.max-table-result.rows = 10000;
SQL-DDL
Flink SQL unterstützt die folgenden CREATE-Anweisungen:
- CREATE TABLE
- CREATE DATABASE
- CREATE CATALOG
Im Folgenden sehen Sie eine Beispielsyntax zum Definieren einer Quelltabelle mithilfe eines JDBC-Connectors für die Verbindung mit MSSQL mit ID und Name als Spalten in einer CREATE TABLE-Anweisung.
CREATE TABLE student_information (
id BIGINT,
name STRING,
address STRING,
grade STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:sqlserver://servername.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
'table-name' = 'students',
'username' = 'username',
'password' = 'password'
);
CREATE DATABASE:
CREATE DATABASE students;
CREATE CATALOG:
CREATE CATALOG myhive WITH ('type'='hive');
Sie können kontinuierliche Abfragen auf Basis dieser Tabellen ausführen.
SELECT id,
COUNT(*) as student_count
FROM student_information
GROUP BY grade;
Schreiben Sie aus der Quelltabelle in die Senkentabelle:
INSERT INTO grade_counts
SELECT id,
COUNT(*) as student_count
FROM student_information
GROUP BY grade;
Hinzufügung von Abhängigkeiten
JAR-Anweisungen werden verwendet, um Benutzer-JARs zum Klassenpfad hinzuzufügen Benutzer-JARs aus dem Klassenpfad zu entfernen oder hinzugefügte JARs im Klassenpfad in der Runtime anzuzeigen.
Flink SQL unterstützt die folgenden JAR-Anweisungen:
- ADD JAR
- SHOW JARS
- REMOVE JAR
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.
Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.
Flink SQL> SHOW JARS;
+----------------------------+
| jars |
+----------------------------+
| /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+
Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.
Hive-Metastore in Apache Flink®-Clustern auf HDInsight on AKS
Kataloge stellen Metadaten bereit, z. B. Datenbanken, Tabellen, Partitionen, Ansichten und Funktionen und Informationen, die für den Zugriff auf in einer Datenbank oder anderen externen Systemen gespeicherte Daten erforderlich sind.
In HDInsight on AKS Flink werden zwei Katalogoptionen unterstützt:
GenericInMemoryCatalog
GenericInMemoryCatalog ist eine In-Memory-Implementierung eines Katalogs. Alle Objekte sind nur für die Lebensdauer der SQL-Sitzung verfügbar.
HiveCatalog
HiveCatalog dient zwei Zwecken: beständiger Speicher für reine Flink-Metadaten und Schnittstelle zum Lesen und Schreiben vorhandener Hive-Metadaten.
Hinweis
HDInsight on AKS-Clustern verfügt über eine integrierte Option von Hive Metastore für Apache Flink. Sie können den Hive-Metastore während der Clustererstellung aktivieren.
Erstellen und Registrieren von Flink-Datenbanken in Katalogen
In diesem Artikel erfahren Sie, wie Sie die CLI verwenden und erste Schritte mit dem Flink-SQL-Client über Secure Shell im Azure-Portal ausführen.
Starten der
sql-client.sh
-SitzungDefault_catalog ist der standardmäßige In-Memory-Katalog.
Überprüfen wir nun die Standarddatenbank des In-Memory-Katalogs:
Erstellen und verwenden wir nun den Hive-Katalog der Version 3.1.2:
CREATE CATALOG myhive WITH ('type'='hive'); USE CATALOG myhive;
Hinweis
HDInsight on AKS unterstützt Hive 3.1.2 und Hadoop 3.3.2.
hive-conf-dir
ist auf den Standort/opt/hive-conf
festgelegt.Erstellen wir die Datenbank im Hive-Katalog und legen sie als Standard für die Sitzung fest (sofern sie nicht geändert wurde).
Erstellen und Registrieren von Hive-Tabellen im Hive-Katalog
Befolgen Sie die Anweisungen unter Erstellen und Registrieren von Flink-Datenbanken in Katalogen.
Erstellen wir eine Flink-Tabelle vom Connectortyp „Hive ohne Partition“:
CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
Fügen wir Daten in die Hive-Tabelle (hive_table) ein:
INSERT INTO hive_table SELECT 2, '10'; INSERT INTO hive_table SELECT 3, '20';
Lesen wir Daten aus der Hive-Tabelle (hive_table):
Flink SQL> SELECT * FROM hive_table; 2023-07-24 09:46:22,225 INFO org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3 +----+-------------+--------------------------------+ | op | x | days | +----+-------------+--------------------------------+ | +I | 3 | 20 | | +I | 2 | 10 | | +I | 1 | 5 | +----+-------------+--------------------------------+ Received a total of 3 rows
Hinweis
Das Hive-Warehouseverzeichnis befindet sich im angegebenen Container des Speicherkontos, das während der Apache Flink-Clustererstellung ausgewählt wurde: hive/warehouse/.
Erstellen wir eine Flink-Tabelle vom Connectortyp „Hive mit Partition“:
CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
Wichtig
Es gibt eine bekannte Einschränkung in Apache Flink. Die letzten n-Spalten werden für Partitionen ausgewählt, unabhängig von der benutzerdefinierten Partitionsspalte. FLINK-32596 Der Partitionsschlüssel ist falsch, wenn Sie Flink-Dialekt zum Erstellen einer Hive-Tabelle verwenden.
Verweis
- Apache Flink-Tabellen-API & SQL
- Apache, Apache Flink, Flink und zugehörige Open Source-Projektnamen sind Handelsmarken der Apache Software Foundation (ASF).