Partage via


API de table et SQL dans des clusters Apache Flink® sur HDInsight et AKS

Important

Azure HDInsight sur AKS a été mis hors service le 31 janvier 2025. En savoir plus avec cette annonce.

Vous devez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent pour éviter l’arrêt brusque de vos charges de travail.

Important

Cette fonctionnalité est actuellement en préversion. Les Conditions d’utilisation supplémentaires pour les préversions Microsoft Azure incluent des termes juridiques supplémentaires qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou qui ne sont pas encore publiées en disponibilité générale. Pour plus d’informations sur cette préversion spécifique, consultez les informations sur Azure HDInsight sur AKS en préversion . Pour des questions ou des suggestions de fonctionnalités, envoyez une demande sur AskHDInsight avec les détails et suivez-nous pour plus de mises à jour sur Communauté Azure HDInsight.

Apache Flink propose deux API relationnelles ( l’API Table et SQL) pour le traitement unifié des flux et des lots. L’API Table est une API de requête intégrée au langage qui permet la composition de requêtes à partir d’opérateurs relationnels tels que la sélection, le filtre et la jointure intuitivement. La prise en charge sql de Flink est basée sur Apache Calcite, qui implémente la norme SQL.

Les interfaces TABLE et SQL s’intègrent en toute transparence avec l’API DataStream de Flink. Vous pouvez facilement basculer entre toutes les API et bibliothèques, qui s’appuient sur elles.

Comme d’autres moteurs SQL, les requêtes Flink fonctionnent sur des tables. Il diffère d’une base de données traditionnelle, car Flink ne gère pas les données au repos localement ; Au lieu de cela, ses requêtes fonctionnent en continu sur des tables externes.

Les pipelines de traitement des données Flink commencent par des tables sources et se terminent par des tables puits. Les tables sources produisent des lignes exploitées pendant l’exécution de la requête ; il s’agit des tables référencées dans la clause FROM d’une requête. Les connecteurs peuvent être de type HDInsight Kafka, HDInsight HBase, Azure Event Hubs, bases de données, systèmes de fichiers ou tout autre système dont le connecteur se trouve dans le classpath.

Vous pouvez consulter cet article sur l’utilisation de l’interface CLI à partir de secure Shell sur le portail Azure. Voici quelques exemples rapides de la prise en main.

  • Pour démarrer le client SQL

    ./bin/sql-client.sh
    
  • Pour passer un fichier sql d’initialisation à exécuter avec sql-client

    ./sql-client.sh -i /path/to/init_file.sql
    
  • Pour définir une configuration dans sql-client

    SET execution.runtime-mode = streaming;
    SET sql-client.execution.result-mode = table;
    SET sql-client.execution.max-table-result.rows = 10000;
    

SQL DDL

Flink SQL prend en charge les instructions CREATE suivantes

  • CRÉER TABLE
  • CRÉER UNE BASE DE DONNÉES
  • CRÉER UN CATALOGUE

Voici un exemple de syntaxe permettant de définir une table source à l’aide du connecteur jdbc pour se connecter à MSSQL, avec des colonnes id et nom dans une instruction CREATE TABLE.

CREATE TABLE student_information (
    id BIGINT,
    name STRING,  
    address STRING,
    grade STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
     'connector' = 'jdbc',
     'url' = 'jdbc:sqlserver://servername.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
     'table-name' = 'students',
     'username' = 'username',
     'password' = 'password'
 );

CREATE DATABASE :

CREATE DATABASE students;

CRÉER CATALOGUE:

CREATE CATALOG myhive WITH ('type'='hive');

Vous pouvez exécuter des requêtes continues sur ces tables

  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

Écrire dans table récepteur à partir de table source:

  INSERT INTO grade_counts
  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

Ajout de dépendances

Les instructions JAR sont utilisées pour ajouter des fichiers jar utilisateur dans le classpath ou supprimer des fichiers jar utilisateur du classpath ou afficher les fichiers jar ajoutés dans le classpath dans le runtime.

Flink SQL prend en charge les instructions JAR suivantes :

  • AJOUTER UN FICHIER JAR
  • AFFICHER LES FICHIERS JAR
  • SUPPRIMER UN FICHIER JAR
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.

Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.

Flink SQL> SHOW JARS;
+----------------------------+
|                       jars |
+----------------------------+
|            /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+

Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.

Les catalogues fournissent des métadonnées, telles que des bases de données, des tables, des partitions, des vues et des fonctions et des informations nécessaires pour accéder aux données stockées dans une base de données ou dans d’autres systèmes externes.

Dans HDInsight sur AKS, nous prenons en charge deux options de catalogue avec Flink :

CatalogueEnMémoireGénérique

Le GenericInMemoryCatalog est une implémentation en mémoire d’un catalogue. Tous les objets sont disponibles uniquement pour la durée de vie de la session sql.

HiveCatalog

Le HiveCatalog sert à deux fins ; en tant que stockage persistant pour les métadonnées Flink pures, et en tant qu’interface pour la lecture et l’écriture de métadonnées Hive existantes.

Note

HDInsight sur les clusters AKS est fourni avec une option intégrée de Metastore Hive pour Apache Flink. Vous pouvez opter pour le metastore Hive pendant la création du cluster

Vous pouvez consulter cet article sur l’utilisation de l’interface CLI et commencer à utiliser Flink SQL Client à partir de Secure Shell sur le portail Azure.

  • Démarrer la session sql-client.sh

    Capture d’écran montrant le catalogue hive par défaut.

    Default_catalog est le catalogue en mémoire par défaut

  • Examinons maintenant la base de données par défaut du catalogue en mémoire Capture d’écran montrant les catalogues en mémoire par défaut.

  • Nous allons créer le catalogue Hive de la version 3.1.2 et l’utiliser

      CREATE CATALOG myhive WITH ('type'='hive');
      USE CATALOG myhive;
    

    Note

    HDInsight sur AKS prend en charge Hive 3.1.2 et Hadoop 3.3.2. La hive-conf-dir est assignée à l’emplacement /opt/hive-conf

  • Nous allons créer la base de données dans le catalogue hive et la rendre par défaut pour la session (sauf modification). Capture d’écran montrant la création d’une base de données dans le catalogue Hive et son catalogue par défaut pour la session.

Guide pratique pour créer et enregistrer des tables Hive dans le catalogue Hive

  • Suivez les instructions de Comment créer et inscrire des bases de données Flink dans le catalogue

  • Créons une table Flink de type de connecteur Hive sans partition

    CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    
  • Insérer des données dans hive_table

    INSERT INTO hive_table SELECT 2, '10';
    INSERT INTO hive_table SELECT 3, '20';
    
  • Lire les données de la table Hive

      Flink SQL> SELECT * FROM hive_table;
      2023-07-24 09:46:22,225 INFO  org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3
      +----+-------------+--------------------------------+
      | op |           x |                           days |
      +----+-------------+--------------------------------+
      | +I |           3 |                             20 |
      | +I |           2 |                             10 |
      | +I |           1 |                              5 |
      +----+-------------+--------------------------------+
      Received a total of 3 rows
    

    Note

    Le répertoire de l'entrepôt Hive se trouve dans le conteneur désigné du compte de stockage choisi lors de la création du cluster Apache Flink, et peut être trouvé à l'emplacement hive/warehouse.

  • Permet de créer une table Flink de type connecteur hive avec partition

    CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    

Important

Il existe une limitation connue dans Apache Flink. Les dernières colonnes « n » sont choisies pour les partitions, quelle que soit la colonne de partition définie par l’utilisateur. FLINK-32596 La clé de partition est incorrecte lors de l’utilisation du dialecte Flink pour créer une table Hive.

Référence