API Table e SQL nei cluster Apache Flink® su HDInsight su AKS
Importante
Azure HDInsight su AKS è stato ritirato il 31 gennaio 2025. Scopri di più con questo annuncio.
È necessario eseguire la migrazione dei carichi di lavoro a Microsoft Fabric o a un prodotto Azure equivalente per evitare la chiusura brusca dei carichi di lavoro.
Importante
Questa funzionalità è attualmente in anteprima. Le condizioni supplementari per l'utilizzo per le anteprime di Microsoft Azure includono termini legali più validi applicabili alle funzionalità di Azure in versione beta, in anteprima o altrimenti non ancora rilasciate nella disponibilità generale. Per informazioni su questa anteprima specifica, vedere informazioni sull'anteprima di Azure HDInsight su AKS. Per domande o suggerimenti sulle funzionalità, invia una richiesta su AskHDInsight con i dettagli e segui la nostra pagina per ulteriori aggiornamenti sulla Community di Azure HDInsight.
Apache Flink include due API relazionali, l'API Tabella e SQL, per l'elaborazione unificata di flussi e batch. L'API Table è un'API di query integrata nel linguaggio che consente la composizione di query da operatori relazionali, ad esempio selezione, filtro e join in modo intuitivo. Il supporto sql di Flink è basato su Apache Calcite, che implementa lo standard SQL.
L'API Table e le interfacce SQL si integrano perfettamente tra loro e l'API DataStream di Flink. È possibile passare facilmente tra tutte le API e le librerie, basate su di esse.
Apache Flink SQL
Analogamente ad altri motori SQL, le query Flink operano sulle tabelle. Differisce da un database tradizionale perché Flink non gestisce i dati inattivi localmente; Le query funzionano invece in modo continuo su tabelle esterne.
Le pipeline di elaborazione dei dati Flink iniziano con le tabelle di origine e terminano con le tabelle sink. Le tabelle di origine producono righe manipolate durante l'esecuzione della query; sono le tabelle a cui si fa riferimento nella clausola FROM di una query. I connettori possono essere di tipo HDInsight Kafka, HDInsight HBase, Hub eventi di Azure, database, file system o qualsiasi altro sistema il cui connettore si trova nel classpath.
Uso di Flink SQL Client in HDInsight nei cluster AKS
È possibile consultare questo articolo per imparare a usare l'interfaccia a riga di comando da Secure Shell nel portale di Azure. Ecco alcuni esempi rapidi di come iniziare.
Per avviare il client SQL
./bin/sql-client.sh
Per passare un file sql di inizializzazione da eseguire insieme a sql-client
./sql-client.sh -i /path/to/init_file.sql
Per impostare una configurazione in sql-client
SET execution.runtime-mode = streaming; SET sql-client.execution.result-mode = table; SET sql-client.execution.max-table-result.rows = 10000;
SQL DDL
Flink SQL supporta le istruzioni CREATE seguenti
- CREA TABELLA
- CREATE DATABASE
- Crea catalogo
Di seguito è riportata una sintassi di esempio per definire una tabella di origine usando il connettore jdbc per connettersi a MSSQL, con ID, nome come colonne in un'istruzione CREATE TABLE
CREATE TABLE student_information (
id BIGINT,
name STRING,
address STRING,
grade STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:sqlserver://servername.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
'table-name' = 'students',
'username' = 'username',
'password' = 'password'
);
CREA DATABASE :
CREATE DATABASE students;
CREATE CATALOG:
CREATE CATALOG myhive WITH ('type'='hive');
È possibile eseguire query continue nella parte superiore di queste tabelle
SELECT id,
COUNT(*) as student_count
FROM student_information
GROUP BY grade;
Scrivere nella Tabella di destinazione da Tabella di origine :
INSERT INTO grade_counts
SELECT id,
COUNT(*) as student_count
FROM student_information
GROUP BY grade;
Aggiunta di dipendenze
Le istruzioni JAR vengono usate per aggiungere file JAR utente nel classpath o rimuovere i file JAR utente dal classpath o visualizzare i file JAR aggiunti nel classpath nel runtime.
Flink SQL supporta le istruzioni JAR seguenti:
- AGGIUNGI JAR
- MOSTRA BARATTOLI
- RIMUOVI BARATTOLO
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.
Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.
Flink SQL> SHOW JARS;
+----------------------------+
| jars |
+----------------------------+
| /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+
Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.
Metastore Hive nei cluster Apache Flink® su HDInsight su AKS
I cataloghi forniscono metadati, ad esempio database, tabelle, partizioni, viste e funzioni e informazioni necessarie per accedere ai dati archiviati in un database o in altri sistemi esterni.
In HDInsight su AKS, Flink supporta due opzioni di catalogo:
GenericInMemoryCatalog
Il GenericInMemoryCatalog è un'implementazione in memoria di un catalogo. Tutti gli oggetti sono disponibili solo per la durata della sessione sql.
HiveCatalog
Il HiveCatalog serve due scopi; come risorsa di archiviazione permanente per i metadati Flink puri e come interfaccia per la lettura e la scrittura di metadati Hive esistenti.
Nota
HDInsight nei cluster AKS include un'opzione integrata di Hive Metastore per Apache Flink. È possibile scegliere Hive Metastore durante la creazione del cluster
Come creare e registrare database Flink nei cataloghi
È possibile fare riferimento a questo articolo su come usare l'interfaccia della riga di comando e iniziare a usare Flink SQL Client da Secure Shell nel portale di Azure.
Avvia
sql-client.sh
sessioneDefault_catalog è il catalogo in memoria predefinito
Ora è possibile controllare il database predefinito del catalogo in memoria
Creare il catalogo Hive della versione 3.1.2 e usarlo
CREATE CATALOG myhive WITH ('type'='hive'); USE CATALOG myhive;
Nota
HDInsight nel servizio Azure Kubernetes supporta hive 3.1.2 e Hadoop 3.3.2. Il
hive-conf-dir
è impostato sulla posizione/opt/hive-conf
Creare database nel catalogo hive e impostarlo come predefinito per la sessione (a meno che non venga modificato).
Come creare e registrare tabelle Hive nel catalogo Hive
Seguire le istruzioni in Come creare e registrare database Flink nel catalogo
Lasciaci creare una tabella Flink di tipo connettore Hive senza partizione.
CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
Inserisci dati in hive_table
INSERT INTO hive_table SELECT 2, '10'; INSERT INTO hive_table SELECT 3, '20';
Leggi i dati da hive_table
Flink SQL> SELECT * FROM hive_table; 2023-07-24 09:46:22,225 INFO org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3 +----+-------------+--------------------------------+ | op | x | days | +----+-------------+--------------------------------+ | +I | 3 | 20 | | +I | 2 | 10 | | +I | 1 | 5 | +----+-------------+--------------------------------+ Received a total of 3 rows
Nota
La directory hive warehouse si trova nel contenitore designato dell'account di archiviazione scelto durante la creazione del cluster Apache Flink, disponibile nella directory hive/warehouse/
Creiamo una tabella Flink del tipo di connettore hive con partizione.
CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
Importante
Esiste una limitazione nota in Apache Flink. Le ultime colonne 'n' vengono scelte per le partizioni, indipendentemente dalla colonna di partizione definita dall'utente. FLINK-32596 La chiave di partizione non è corretta quando si usa il dialetto Flink per creare una tabella Hive.
Riferimento
- API per tabelle di Apache Flink & SQL
- Apache, Apache Flink, Flink e i nomi dei progetti open source associati sono marchi della Apache Software Foundation (ASF).