Freigeben über


Tabellen-API und SQL in Apache Flink®-Clustern auf HDInsight on AKS

Hinweis

Azure HDInsight on AKS wird am 31. Januar 2025 eingestellt. Vor dem 31. Januar 2025 müssen Sie Ihre Workloads zu Microsoft Fabric oder einem gleichwertigen Azure-Produkt migrieren, um eine abruptes Beendigung Ihrer Workloads zu vermeiden. Die verbleibenden Cluster in Ihrem Abonnement werden beendet und vom Host entfernt.

Bis zum Einstellungsdatum ist nur grundlegende Unterstützung verfügbar.

Wichtig

Diese Funktion steht derzeit als Vorschau zur Verfügung. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure-Vorschauen enthalten weitere rechtliche Bestimmungen, die für Azure-Features in Betaversionen, in Vorschauversionen oder anderen Versionen gelten, die noch nicht allgemein verfügbar gemacht wurden. Informationen zu dieser spezifischen Vorschau finden Sie unter Informationen zur Vorschau von Azure HDInsight on AKS. Bei Fragen oder Funktionsvorschlägen senden Sie eine Anfrage an AskHDInsight mit den entsprechenden Details, und folgen Sie uns für weitere Updates in der Azure HDInsight-Community.

Apache Flink verfügt über zwei relationale APIs – die Tabellen-API und SQL – für die einheitliche Datenstrom- und Batchverarbeitung. Die Tabellen-API ist eine LINQ-API (Language Integrated Query), die die intuitive Zusammenstellung von Abfragen von relationalen Operatoren wie Auswahl-, Filter- und Verknüpfungsoperator ermöglicht. Die SQL-Unterstützung von Flink basiert auf Apache Calcite, das den SQL-Standard implementiert.

Die Tabellen-API und SQL-Schnittstellen lassen sich nahtlos ineinander und in die DataStream-API von Flink integrieren. Sie können ganz einfach zwischen allen APIs und Bibliotheken wechseln, die darauf aufbauen.

Wie andere SQL-Engines werden Flink-Abfragen auf der Grundlage von Tabellen ausgeführt. Flink unterscheidet sich von einer herkömmlichen Datenbank, da das Framework keine ruhenden Daten lokal verwaltet. Stattdessen werden die Abfragen kontinuierlich über externe Tabellen ausgeführt.

Flink-Datenverarbeitungspipelines beginnen mit Quelltabellen und enden mit Senkentabellen. Quelltabellen erzeugen Zeilen, die während der Abfrageausführung ausgeführt werden. Es handelt sich dabei um die Tabellen, auf die in der FROM-Klausel einer Abfrage verwiesen wird. Connectors können vom Typ „HDInsight Kafka“, „HDInsight HBase“, „Azure Event Hubs“ bzw. Datenbanken, Dateisysteme oder andere Systeme sein, deren Connector sich im Klassenpfad befindet.

In diesem Artikel erfahren Sie, wie Sie die CLI über Secure Shell im Azure-Portal verwenden. Hier finden Sie einige kurze Beispiele für die ersten Schritte.

  • So starten Sie den SQL-Client:

    ./bin/sql-client.sh
    
  • So übergeben Sie eine SQL-Initialisierungsdatei, die zusammen mit dem SQL-Client ausgeführt werden soll:

    ./sql-client.sh -i /path/to/init_file.sql
    
  • So legen Sie eine Konfiguration im SQL-Client fest:

    SET execution.runtime-mode = streaming;
    SET sql-client.execution.result-mode = table;
    SET sql-client.execution.max-table-result.rows = 10000;
    

SQL-DDL

Flink SQL unterstützt die folgenden CREATE-Anweisungen:

  • CREATE TABLE
  • CREATE DATABASE
  • CREATE CATALOG

Im Folgenden sehen Sie eine Beispielsyntax zum Definieren einer Quelltabelle mithilfe eines JDBC-Connectors für die Verbindung mit MSSQL mit ID und Name als Spalten in einer CREATE TABLE-Anweisung.

CREATE TABLE student_information (
    id BIGINT,
    name STRING,  
    address STRING,
    grade STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
     'connector' = 'jdbc',
     'url' = 'jdbc:sqlserver://servername.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
     'table-name' = 'students',
     'username' = 'username',
     'password' = 'password'
 );

CREATE DATABASE:

CREATE DATABASE students;

CREATE CATALOG:

CREATE CATALOG myhive WITH ('type'='hive');

Sie können kontinuierliche Abfragen auf Basis dieser Tabellen ausführen.

  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

Schreiben Sie aus der Quelltabelle in die Senkentabelle:

  INSERT INTO grade_counts
  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

Hinzufügung von Abhängigkeiten

JAR-Anweisungen werden verwendet, um Benutzer-JARs zum Klassenpfad hinzuzufügen Benutzer-JARs aus dem Klassenpfad zu entfernen oder hinzugefügte JARs im Klassenpfad in der Runtime anzuzeigen.

Flink SQL unterstützt die folgenden JAR-Anweisungen:

  • ADD JAR
  • SHOW JARS
  • REMOVE JAR
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.

Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.

Flink SQL> SHOW JARS;
+----------------------------+
|                       jars |
+----------------------------+
|            /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+

Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.

Kataloge stellen Metadaten bereit, z. B. Datenbanken, Tabellen, Partitionen, Ansichten und Funktionen und Informationen, die für den Zugriff auf in einer Datenbank oder anderen externen Systemen gespeicherte Daten erforderlich sind.

In HDInsight on AKS Flink werden zwei Katalogoptionen unterstützt:

GenericInMemoryCatalog

GenericInMemoryCatalog ist eine In-Memory-Implementierung eines Katalogs. Alle Objekte sind nur für die Lebensdauer der SQL-Sitzung verfügbar.

HiveCatalog

HiveCatalog dient zwei Zwecken: beständiger Speicher für reine Flink-Metadaten und Schnittstelle zum Lesen und Schreiben vorhandener Hive-Metadaten.

Hinweis

HDInsight on AKS-Clustern verfügt über eine integrierte Option von Hive Metastore für Apache Flink. Sie können den Hive-Metastore während der Clustererstellung aktivieren.

In diesem Artikel erfahren Sie, wie Sie die CLI verwenden und erste Schritte mit dem Flink-SQL-Client über Secure Shell im Azure-Portal ausführen.

  • Starten der sql-client.sh-Sitzung

    Screenshot: Standard-Hive-Katalog

    Default_catalog ist der standardmäßige In-Memory-Katalog.

  • Überprüfen wir nun die Standarddatenbank des In-Memory-Katalogs: Screenshot: In-Memory-Standardkatalog

  • Erstellen und verwenden wir nun den Hive-Katalog der Version 3.1.2:

      CREATE CATALOG myhive WITH ('type'='hive');
      USE CATALOG myhive;
    

    Hinweis

    HDInsight on AKS unterstützt Hive 3.1.2 und Hadoop 3.3.2. hive-conf-dir ist auf den Standort /opt/hive-conf festgelegt.

  • Erstellen wir die Datenbank im Hive-Katalog und legen sie als Standard für die Sitzung fest (sofern sie nicht geändert wurde). Screenshot: Erstellen einer Datenbank im Strukturkatalog und Festlegen des Standardkatalogs für die Sitzung

Erstellen und Registrieren von Hive-Tabellen im Hive-Katalog

  • Befolgen Sie die Anweisungen unter Erstellen und Registrieren von Flink-Datenbanken in Katalogen.

  • Erstellen wir eine Flink-Tabelle vom Connectortyp „Hive ohne Partition“:

    CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    
  • Fügen wir Daten in die Hive-Tabelle (hive_table) ein:

    INSERT INTO hive_table SELECT 2, '10';
    INSERT INTO hive_table SELECT 3, '20';
    
  • Lesen wir Daten aus der Hive-Tabelle (hive_table):

      Flink SQL> SELECT * FROM hive_table;
      2023-07-24 09:46:22,225 INFO  org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3
      +----+-------------+--------------------------------+
      | op |           x |                           days |
      +----+-------------+--------------------------------+
      | +I |           3 |                             20 |
      | +I |           2 |                             10 |
      | +I |           1 |                              5 |
      +----+-------------+--------------------------------+
      Received a total of 3 rows
    

    Hinweis

    Das Hive-Warehouseverzeichnis befindet sich im angegebenen Container des Speicherkontos, das während der Apache Flink-Clustererstellung ausgewählt wurde: hive/warehouse/.

  • Erstellen wir eine Flink-Tabelle vom Connectortyp „Hive mit Partition“:

    CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    

Wichtig

Es gibt eine bekannte Einschränkung in Apache Flink. Die letzten n-Spalten werden für Partitionen ausgewählt, unabhängig von der benutzerdefinierten Partitionsspalte. FLINK-32596 Der Partitionsschlüssel ist falsch, wenn Sie Flink-Dialekt zum Erstellen einer Hive-Tabelle verwenden.

Verweis