Delen via


Table-API en SQL in Apache Flink®-clusters op HDInsight op AKS

Belangrijk

Azure HDInsight op AKS is op 31 januari 2025 buiten gebruik gesteld. Ontdek meer over met deze aankondiging.

U moet uw workloads migreren naar Microsoft Fabric- of een gelijkwaardig Azure-product om plotselinge beëindiging van uw workloads te voorkomen.

Belangrijk

Deze functie is momenteel beschikbaar als preview-versie. De aanvullende gebruiksvoorwaarden voor Microsoft Azure Previews meer juridische voorwaarden bevatten die van toepassing zijn op Azure-functies die bèta, in preview of anderszins nog niet in algemene beschikbaarheid zijn vrijgegeven. Zie Azure HDInsight op AKS previewinformatievoor meer informatie over deze specifieke preview. Voor vragen of suggesties voor functies dient u een aanvraag in op AskHDInsight- met de details en volgt u ons voor meer updates over Azure HDInsight Community-.

Apache Flink bevat twee relationele API's, de Table-API en SQL, voor geïntegreerde stream- en batchverwerking. De Table-API is een met taal geïntegreerde query-API waarmee u query's kunt samenstellen van relationele operators, zoals selectie, filter en join intuïtief. De SQL-ondersteuning van Flink is gebaseerd op Apache Calcite, waarmee de SQL-standaard wordt geïmplementeerd.

De Table-API en SQL-interfaces kunnen naadloos worden geïntegreerd met elkaar en de DataStream-API van Flink. U kunt eenvoudig schakelen tussen alle API's en bibliotheken, die hierop zijn gebaseerd.

Net als andere SQL-engines werken Flink-query's op tabellen. Het verschilt van een traditionele database omdat Flink geen statische data lokaal beheert; in plaats daarvan worden de query's continu uitgevoerd over externe tabellen.

Flink pijplijnen voor gegevensverwerking beginnen met brontabellen en eindigen met sinktabellen. Brontabellen produceren rijen die worden verwerkt tijdens de uitvoering van de query; dit zijn de tabellen waarnaar wordt verwezen in het FROM deel van een query. Connectors kunnen van het type HDInsight Kafka, HDInsight HBase, Azure Event Hubs, databases, bestandssysteem of een ander systeem zijn waarvan de connector zich in het klassepad bevindt.

Raadpleeg dit artikel over het gebruik van CLI vanuit Secure Shell- in Azure Portal. Hier volgen enkele snelle voorbeelden van hoe u aan de slag kunt gaan.

  • De SQL-client starten

    ./bin/sql-client.sh
    
  • Een initialisatie-SQL-bestand doorgeven om samen met sql-client uit te voeren

    ./sql-client.sh -i /path/to/init_file.sql
    
  • Een configuratie instellen in sql-client

    SET execution.runtime-mode = streaming;
    SET sql-client.execution.result-mode = table;
    SET sql-client.execution.max-table-result.rows = 10000;
    

SQL DDL

Flink SQL ondersteunt de volgende CREATE-instructies

  • CREATE TABLE
  • DATABASE MAKEN
  • CATALOGUS MAKEN

Hieronder volgt een voorbeeld van de syntaxis voor het definiëren van een brontabel met behulp van de jdbc-connector voor verbinding met MSSQL, met id, naam als kolommen in een CREATE TABLE-INSTRUCTIE

CREATE TABLE student_information (
    id BIGINT,
    name STRING,  
    address STRING,
    grade STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
     'connector' = 'jdbc',
     'url' = 'jdbc:sqlserver://servername.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
     'table-name' = 'students',
     'username' = 'username',
     'password' = 'password'
 );

MAAK DATABASE AAN:

CREATE DATABASE students;

CREATE CATALOG:

CREATE CATALOG myhive WITH ('type'='hive');

U kunt doorlopende query's uitvoeren boven op deze tabellen

  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

Schrijf naar sinktabel uit brontabel:

  INSERT INTO grade_counts
  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

Afhankelijkheden toevoegen

JAR-instructies worden gebruikt om gebruikers-JAR-bestanden toe te voegen aan het klassepad of gebruikers-JAR's te verwijderen uit het klassepad of om toegevoegde JAR's weer te geven in het klassepad in de runtime.

Flink SQL ondersteunt de volgende JAR-instructies:

  • JAR TOEVOEGEN
  • JARS WEERGEVEN
  • JAR VERWIJDEREN
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.

Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.

Flink SQL> SHOW JARS;
+----------------------------+
|                       jars |
+----------------------------+
|            /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+

Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.

Catalogi bieden metagegevens, zoals databases, tabellen, partities, weergaven en functies en informatie die nodig is voor toegang tot gegevens die zijn opgeslagen in een database of andere externe systemen.

In HDInsight op AKS ondersteunen we twee catalogusopties:

GenericInMemoryCatalog

De GenericInMemoryCatalog is een in-memory implementatie van een catalogus. Alle objecten zijn alleen beschikbaar voor de levensduur van de SQL-sessie.

HiveCatalog-

De HiveCatalog- dient twee doeleinden; als permanente opslag voor pure Flink-metagegevens en als interface voor het lezen en schrijven van bestaande Hive-metagegevens.

Notitie

HDInsight op AKS-clusters wordt geleverd met een geïntegreerde optie van Hive Metastore voor Apache Flink. U kunt tijdens het maken van cluster kiezen voor Hive Metastore

U kunt dit artikel raadplegen over het gebruik van CLI en aan de slag met Flink SQL Client vanuit Secure Shell in Azure Portal.

  • sql-client.sh sessie starten

    Schermopname van de standaard hive-catalogus.

    Default_catalog is de standaardcatalogus in het geheugen

  • Laten we nu de standaarddatabase van de catalogus in het geheugen controleren Schermopname met standaard catalogi in het geheugen.

  • Laten we Hive Catalog van versie 3.1.2 maken en gebruiken

      CREATE CATALOG myhive WITH ('type'='hive');
      USE CATALOG myhive;
    

    Notitie

    HDInsight op AKS ondersteunt Hive 3.1.2 en Hadoop 3.3.2. De hive-conf-dir is ingesteld op locatie /opt/hive-conf

  • Laten we de Database in de hive-catalogus maken en deze standaard instellen voor de sessie (tenzij gewijzigd). Schermopname van het maken van een database in hive-catalogus en het maken van de standaardcatalogus voor de sessie.

Hive-tabellen maken en registreren bij Hive-catalogus

  • Volg de instructies voor Het maken en registreren van Flink Databases voor Catalog

  • Laten we Flink Table van connectortype Hive maken zonder partitie

    CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    
  • Gegevens invoegen in hive_table

    INSERT INTO hive_table SELECT 2, '10';
    INSERT INTO hive_table SELECT 3, '20';
    
  • Gegevens lezen uit hive_table

      Flink SQL> SELECT * FROM hive_table;
      2023-07-24 09:46:22,225 INFO  org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3
      +----+-------------+--------------------------------+
      | op |           x |                           days |
      +----+-------------+--------------------------------+
      | +I |           3 |                             20 |
      | +I |           2 |                             10 |
      | +I |           1 |                              5 |
      +----+-------------+--------------------------------+
      Received a total of 3 rows
    

    Notitie

    Hive Warehouse Directory bevindt zich in de aangewezen container van het opslagaccount dat is gekozen tijdens het maken van het Apache Flink-cluster, vindt u in directory hive/warehouse/

  • Hiermee kunt u Flink Table van verbindingslijntype Hive maken met Partition

    CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    

Belangrijk

Er is een bekende beperking in Apache Flink. De laatste 'n' kolommen worden gekozen voor partities, ongeacht de door de gebruiker gedefinieerde partitiekolom. FLINK-32596 De partitiesleutel is onjuist wanneer u Flink dialect gebruikt om een Hive-tabel te maken.

Referentie