Sdílet prostřednictvím


Rozhraní TABLE API a SQL v clusterech Apache Flink® ve službě HDInsight v AKS

Důležitý

Azure HDInsight v AKS byl vyřazen 31. ledna 2025. Další informace najdete v tomto oznámení .

Abyste se vyhnuli náhlému ukončení úloh, musíte migrovat úlohy do Microsoft Fabric nebo ekvivalentního produktu Azure.

Důležitý

Tato funkce je aktuálně ve verzi Preview. doplňkové podmínky použití pro verze preview Microsoft Azure obsahují další právní podmínky, které se vztahují na funkce Azure, které jsou v beta verzi, ve verzi preview nebo jinak dosud nebyly vydány k obecné dostupnosti. Informace o této konkrétní verzi preview najdete v tématu Azure HDInsight ve službě AKS ve verzi previewu. Pokud máte dotazy nebo návrhy funkcí, odešlete prosím žádost na AskHDInsight s podrobnostmi a sledujte nás pro další aktualizace v komunitě Azure HDInsight .

Apache Flink nabízí dvě relační rozhraní API – Table API a SQL – pro jednotné zpracování streamů a dávek. Rozhraní Table API je rozhraní API pro dotazy integrované v jazyce, které umožňuje intuitivně skládat dotazy z relačních operátorů, jako je výběr, filtrování a spojení. Podpora jazyka SQL Flink je založená na Apache Calcite, který implementuje standard SQL.

Rozhraní Table API a rozhraní SQL se bezproblémově integrují s Flinkovým rozhraním DataStream API. Můžete snadno přepínat mezi všemi rozhraními API a knihovnami, které na nich vycházejí.

Stejně jako ostatní moduly SQL fungují dotazy Flink nad tabulkami. Liší se od tradiční databáze, protože Flink nespravuje neaktivní uložená data místně; místo toho dotazy pracují nepřetržitě s externími tabulkami.

Kanály zpracování dat Flink začínají zdrojovými tabulkami a končí výstupními tabulkami. Zdrojové tabulky vytvářejí řádky, nad kterými se provádí operace během provádění dotazu. Jsou to tabulky odkazované v klauzuli FROM dotazu. Konektory mohou být typu HDInsight Kafka, HDInsight HBase, Azure Event Hubs, databázové systémy, souborové systémy nebo jakýkoli jiný systém, jehož konektor je v classpath.

V tomto článku se dozvíte, jak používat rozhraní příkazového řádku ze služby Secure Shell na webu Azure Portal. Tady je několik rychlých ukázek, jak začít.

  • Spuštění klienta SQL

    ./bin/sql-client.sh
    
  • Předání inicializačního souboru SQL ke spuštění společně s sql-clientem

    ./sql-client.sh -i /path/to/init_file.sql
    
  • Nastavení konfigurace v sql-clientu

    SET execution.runtime-mode = streaming;
    SET sql-client.execution.result-mode = table;
    SET sql-client.execution.max-table-result.rows = 10000;
    

SQL DDL

Flink SQL podporuje následující příkazy CREATE.

  • CREATE TABLE
  • VYTVOŘIT DATABÁZI
  • VYTVOŘIT KATALOG

Následuje příklad syntaxe definování zdrojové tabulky pomocí konektoru jdbc pro připojení k MSSQL s ID, název jako sloupce v příkazu CREATE TABLE

CREATE TABLE student_information (
    id BIGINT,
    name STRING,  
    address STRING,
    grade STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
     'connector' = 'jdbc',
     'url' = 'jdbc:sqlserver://servername.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
     'table-name' = 'students',
     'username' = 'username',
     'password' = 'password'
 );

Vytvořit databázi (CREATE DATABASE):

CREATE DATABASE students;

VYTVOŘIT KATALOG:

CREATE CATALOG myhive WITH ('type'='hive');

V horní části těchto tabulek můžete spouštět průběžné dotazy.

  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

Zapište do Sink Table z Source Table:

  INSERT INTO grade_counts
  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

Přidání závislostí

Příkazy JAR se používají k přidání uživatelských JARů do cesty ke třídám, odebrání uživatelských JARů z cesty ke třídám, nebo zobrazení přidaných JARů v cestě ke třídám během běhu programu.

Flink SQL podporuje následující příkazy JAR:

  • PŘIDAT JAR
  • ZOBRAZIT DÓZY
  • ODEBRAT JAR
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.

Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.

Flink SQL> SHOW JARS;
+----------------------------+
|                       jars |
+----------------------------+
|            /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+

Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.

Katalogy poskytují metadata, jako jsou databáze, tabulky, oddíly, zobrazení a funkce a informace potřebné pro přístup k datům uloženým v databázi nebo jiných externích systémech.

Ve službě HDInsight na AKS pro Flink podporujeme dvě možnosti katalogu:

GenericInMemoryCatalog

GenericInMemoryCatalog je katalog implementovaný v paměti. Všechny objekty jsou k dispozici pouze po celou dobu životnosti relace SQL.

Hive Katalog

HiveCatalog slouží dvěma účelům; jako trvalé úložiště pro čistě metadata Flink a jako rozhraní pro čtení a zápis existujících metadat Hive.

Poznámka

HDInsight v clusterech AKS obsahuje integrovanou možnost metastoru Hive pro Apache Flink. Při vytváření clusteru můžete zvolit Hive Metastore

V tomto článku se dozvíte, jak používat rozhraní příkazového řádku a jak začít používat Flink SQL Client z Secure Shell na webu Azure Portal.

  • Zahájení relace sql-client.sh

    Snímek obrazovky s výchozím katalogem Hive

    Default_catalog je výchozí katalog v paměti.

  • Teď zkontrolujeme výchozí databázi katalogu v paměti Snímek obrazovky s výchozími katalogy v paměti.

  • Pojďme vytvořit katalog Hive verze 3.1.2 a použít ho

      CREATE CATALOG myhive WITH ('type'='hive');
      USE CATALOG myhive;
    

    Poznámka

    HDInsight v AKS podporuje Hive 3.1.2 a Hadoop 3.3.2. hive-conf-dir je nastaveno na umístění /opt/hive-conf

  • Pojďme vytvořit databázi v katalogu Hive a nastavit ji jako výchozí pro relaci (pokud se nezmění). snímek obrazovky znázorňující vytvoření databáze v katalogu Hive a nastavení výchozího katalogu pro relaci

Vytvoření a registrace tabulek Hive do katalogu Hive

  • Postupujte podle pokynů na Vytvoření a registrace databází Flink v katalogu

  • Vytvořme Flink tabulku typu konektoru Hive bez dělení.

    CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    
  • Vložení dat do hive_table

    INSERT INTO hive_table SELECT 2, '10';
    INSERT INTO hive_table SELECT 3, '20';
    
  • Čtení dat z hive_table

      Flink SQL> SELECT * FROM hive_table;
      2023-07-24 09:46:22,225 INFO  org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3
      +----+-------------+--------------------------------+
      | op |           x |                           days |
      +----+-------------+--------------------------------+
      | +I |           3 |                             20 |
      | +I |           2 |                             10 |
      | +I |           1 |                              5 |
      +----+-------------+--------------------------------+
      Received a total of 3 rows
    

    Poznámka

    Adresář skladu Hive se nachází v určeném kontejneru účtu úložiště zvoleného při vytváření clusteru Apache Flink. Najdete ho v adresáři hive/warehouse/

  • Vytvořme tabulku Flink typu konektoru hive s particí.

    CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    

Důležitý

Apache Flink má známé omezení. Poslední sloupce "n" se vyberou pro oddíly bez ohledu na sloupec oddílu definovaný uživatelem. FLINK-32596 Při vytváření tabulky Hive pomocí dialektu Flink bude klíč oddílu chybný.

Odkaz