Condividi tramite


API Table e SQL nei cluster Apache Flink® su HDInsight su AKS

Importante

Azure HDInsight su AKS è stato ritirato il 31 gennaio 2025. Scopri di più con questo annuncio.

È necessario eseguire la migrazione dei carichi di lavoro a Microsoft Fabric o a un prodotto Azure equivalente per evitare la chiusura brusca dei carichi di lavoro.

Importante

Questa funzionalità è attualmente in anteprima. Le condizioni supplementari per l'utilizzo per le anteprime di Microsoft Azure includono termini legali più validi applicabili alle funzionalità di Azure in versione beta, in anteprima o altrimenti non ancora rilasciate nella disponibilità generale. Per informazioni su questa anteprima specifica, vedere informazioni sull'anteprima di Azure HDInsight su AKS. Per domande o suggerimenti sulle funzionalità, invia una richiesta su AskHDInsight con i dettagli e segui la nostra pagina per ulteriori aggiornamenti sulla Community di Azure HDInsight.

Apache Flink include due API relazionali, l'API Tabella e SQL, per l'elaborazione unificata di flussi e batch. L'API Table è un'API di query integrata nel linguaggio che consente la composizione di query da operatori relazionali, ad esempio selezione, filtro e join in modo intuitivo. Il supporto sql di Flink è basato su Apache Calcite, che implementa lo standard SQL.

L'API Table e le interfacce SQL si integrano perfettamente tra loro e l'API DataStream di Flink. È possibile passare facilmente tra tutte le API e le librerie, basate su di esse.

Analogamente ad altri motori SQL, le query Flink operano sulle tabelle. Differisce da un database tradizionale perché Flink non gestisce i dati inattivi localmente; Le query funzionano invece in modo continuo su tabelle esterne.

Le pipeline di elaborazione dei dati Flink iniziano con le tabelle di origine e terminano con le tabelle sink. Le tabelle di origine producono righe manipolate durante l'esecuzione della query; sono le tabelle a cui si fa riferimento nella clausola FROM di una query. I connettori possono essere di tipo HDInsight Kafka, HDInsight HBase, Hub eventi di Azure, database, file system o qualsiasi altro sistema il cui connettore si trova nel classpath.

È possibile consultare questo articolo per imparare a usare l'interfaccia a riga di comando da Secure Shell nel portale di Azure. Ecco alcuni esempi rapidi di come iniziare.

  • Per avviare il client SQL

    ./bin/sql-client.sh
    
  • Per passare un file sql di inizializzazione da eseguire insieme a sql-client

    ./sql-client.sh -i /path/to/init_file.sql
    
  • Per impostare una configurazione in sql-client

    SET execution.runtime-mode = streaming;
    SET sql-client.execution.result-mode = table;
    SET sql-client.execution.max-table-result.rows = 10000;
    

SQL DDL

Flink SQL supporta le istruzioni CREATE seguenti

  • CREA TABELLA
  • CREATE DATABASE
  • Crea catalogo

Di seguito è riportata una sintassi di esempio per definire una tabella di origine usando il connettore jdbc per connettersi a MSSQL, con ID, nome come colonne in un'istruzione CREATE TABLE

CREATE TABLE student_information (
    id BIGINT,
    name STRING,  
    address STRING,
    grade STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
     'connector' = 'jdbc',
     'url' = 'jdbc:sqlserver://servername.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
     'table-name' = 'students',
     'username' = 'username',
     'password' = 'password'
 );

CREA DATABASE :

CREATE DATABASE students;

CREATE CATALOG:

CREATE CATALOG myhive WITH ('type'='hive');

È possibile eseguire query continue nella parte superiore di queste tabelle

  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

Scrivere nella Tabella di destinazione da Tabella di origine :

  INSERT INTO grade_counts
  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

Aggiunta di dipendenze

Le istruzioni JAR vengono usate per aggiungere file JAR utente nel classpath o rimuovere i file JAR utente dal classpath o visualizzare i file JAR aggiunti nel classpath nel runtime.

Flink SQL supporta le istruzioni JAR seguenti:

  • AGGIUNGI JAR
  • MOSTRA BARATTOLI
  • RIMUOVI BARATTOLO
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.

Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.

Flink SQL> SHOW JARS;
+----------------------------+
|                       jars |
+----------------------------+
|            /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+

Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.

I cataloghi forniscono metadati, ad esempio database, tabelle, partizioni, viste e funzioni e informazioni necessarie per accedere ai dati archiviati in un database o in altri sistemi esterni.

In HDInsight su AKS, Flink supporta due opzioni di catalogo:

GenericInMemoryCatalog

Il GenericInMemoryCatalog è un'implementazione in memoria di un catalogo. Tutti gli oggetti sono disponibili solo per la durata della sessione sql.

HiveCatalog

Il HiveCatalog serve due scopi; come risorsa di archiviazione permanente per i metadati Flink puri e come interfaccia per la lettura e la scrittura di metadati Hive esistenti.

Nota

HDInsight nei cluster AKS include un'opzione integrata di Hive Metastore per Apache Flink. È possibile scegliere Hive Metastore durante la creazione del cluster

È possibile fare riferimento a questo articolo su come usare l'interfaccia della riga di comando e iniziare a usare Flink SQL Client da Secure Shell nel portale di Azure.

  • Avvia sql-client.sh sessione

    Screenshot che mostra il catalogo hive predefinito.

    Default_catalog è il catalogo in memoria predefinito

  • Ora è possibile controllare il database predefinito del catalogo in memoria Screenshot che mostra i cataloghi in memoria predefiniti.

  • Creare il catalogo Hive della versione 3.1.2 e usarlo

      CREATE CATALOG myhive WITH ('type'='hive');
      USE CATALOG myhive;
    

    Nota

    HDInsight nel servizio Azure Kubernetes supporta hive 3.1.2 e Hadoop 3.3.2. Il hive-conf-dir è impostato sulla posizione /opt/hive-conf

  • Creare database nel catalogo hive e impostarlo come predefinito per la sessione (a meno che non venga modificato). Una schermata che mostra la creazione del database nel catalogo Hive e come impostarlo come catalogo predefinito per la sessione.

Come creare e registrare tabelle Hive nel catalogo Hive

  • Seguire le istruzioni in Come creare e registrare database Flink nel catalogo

  • Lasciaci creare una tabella Flink di tipo connettore Hive senza partizione.

    CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    
  • Inserisci dati in hive_table

    INSERT INTO hive_table SELECT 2, '10';
    INSERT INTO hive_table SELECT 3, '20';
    
  • Leggi i dati da hive_table

      Flink SQL> SELECT * FROM hive_table;
      2023-07-24 09:46:22,225 INFO  org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3
      +----+-------------+--------------------------------+
      | op |           x |                           days |
      +----+-------------+--------------------------------+
      | +I |           3 |                             20 |
      | +I |           2 |                             10 |
      | +I |           1 |                              5 |
      +----+-------------+--------------------------------+
      Received a total of 3 rows
    

    Nota

    La directory hive warehouse si trova nel contenitore designato dell'account di archiviazione scelto durante la creazione del cluster Apache Flink, disponibile nella directory hive/warehouse/

  • Creiamo una tabella Flink del tipo di connettore hive con partizione.

    CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    

Importante

Esiste una limitazione nota in Apache Flink. Le ultime colonne 'n' vengono scelte per le partizioni, indipendentemente dalla colonna di partizione definita dall'utente. FLINK-32596 La chiave di partizione non è corretta quando si usa il dialetto Flink per creare una tabella Hive.

Riferimento