Delen via


Hive dialect in Apache Flink-clusters® in HDInsight op AKS

Belangrijk

Azure HDInsight op AKS is op 31 januari 2025 buiten gebruik gesteld. Leer meer over uit deze aankondiging.

U moet uw workloads migreren naar Microsoft Fabric- of een gelijkwaardig Azure-product om plotselinge beëindiging van uw workloads te voorkomen.

Belangrijk

Deze functie is momenteel beschikbaar als preview-versie. De aanvullende gebruiksvoorwaarden voor Microsoft Azure Previews meer juridische voorwaarden bevatten die van toepassing zijn op Azure-functies die bèta, in preview of anderszins nog niet in algemene beschikbaarheid zijn vrijgegeven. Zie Azure HDInsight in AKS preview-informatievoor meer informatie over deze specifieke preview. Voor vragen of suggesties voor functies dient u een aanvraag in op AskHDInsight- met de details en volgt u ons voor meer updates over Azure HDInsight Community-.

In dit artikel leert u hoe u Hive-dialect gebruikt in Apache Flink-clusters in HDInsight op AKS.

Introductie

De gebruiker kan het standaard-flink dialect niet wijzigen in hive dialect voor hun gebruik in HDInsight op AKS-clusters. Alle SQL-bewerkingen mislukken zodra ze zijn gewijzigd in hive dialect met de volgende fout.


*java.lang.ClassCastException: class jdk.internal.loader.ClassLoaders$AppClassLoader can't be cast to class java.net.URLClassLoader*

De reden voor dit probleem is het gevolg van een open Hive Jira. Momenteel gaat Hive ervan uit dat het systeemklasselaadprogramma een exemplaar van URLClassLoader is. In Java 11is deze aanname niet het geval.

  • Voer de volgende stappen uit in webssh-:

    1. Verwijder de bestaande flink-sql-connector-hive*jar in lib-locatie
      rm /opt/flink-webssh/lib/flink-sql-connector-hive*jar
      
    2. Download de volgende jar in webssh pod en voeg het toe onder de /opt/flink-webssh/lib wget https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader/1.17.0. (De bovenstaande Hive Jar heeft de fix https://issues.apache.org/jira/browse/HIVE-27508)
    mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
    mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
    
    1. Voeg de volgende sleutels toe in het flink configuratiebeheer onder core-site.xml sectie:
      fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY>
      flink.hadoop.fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY>
      
  • Hier volgt een overzicht van hive-dialectqueries

    • Hive dialect uitvoeren in Flink zonder partitionering
      root [ ~ ]# ./bin/sql-client.sh
      Flink SQL>
      Flink SQL> create catalog myhive with ('type' = 'hive', 'hive-conf-dir' = '/opt/hive-conf');
      [INFO] Execute statement succeed.
    
      Flink SQL> use catalog myhive;
      [INFO] Execute statement succeed.
    
      Flink SQL> load module hive;
      [INFO] Execute statement succeed.
    
      Flink SQL> use modules hive,core;
      [INFO] Execute statement succeed.
    
      Flink SQL> set table.sql-dialect=hive;
      [INFO] Session property has been set.
    
      Flink SQL> set sql-client.execution.result-mode=tableau;
      [INFO] Session property has been set.
    
      Flink SQL> select explode(array(1,2,3));Hive Session ID = 6ba45be2-360e-4bee-8842-2765c91581c8
    
    
    > [!WARNING]
    > An illegal reflective access operation has occurred
    
    > [!WARNING]
    > Illegal reflective access by org.apache.hadoop.hive.common.StringInternUtils (file:/opt/flink-webssh/lib/flink-sql-connector-hive-3.1.2_2.12-1.16-SNAPSHOT.jar) to field java.net.URI.string
    
    > [!WARNING]
    > Please consider reporting this to the maintainers of org.apache.hadoop.hive.common.StringInternUtils
    
    > [!WARNING]
    > `Use --illegal-access=warn` to enable warnings of further illegal reflective access operations
    
    > [!WARNING]
    >  All illegal access operations will be denied in a future release
    select explode(array(1,2,3));
    
    
    +----+-------------+
    | op |         col |
    +----+-------------+
    | +I |           1 |
    | +I |           2 |
    | +I |           3 |
    +----+-------------+
    
    Received a total of 3 rows
    
    Flink SQL> create table tttestHive Session ID = fb8b652a-8dad-4781-8384-0694dc16e837
    
    [INFO] Execute statement succeed.
    
    Flink SQL> insert into table tttestHive Session ID = f239dc6f-4b58-49f9-ad02-4c73673737d8),(3,'c'),(4,'d');
    
    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: d0542da4c4252f9494298666ff4e9f8e
    
    Flink SQL> set execution.runtime-mode=batch;
    [INFO] Session property has been set.
    
    Flink SQL> select * from tttestHive Session ID = 61b6eb3b-90a6-499c-aced-0598366c5b31
    
    +-----+-------+
    | key | value |
    +-----+-------+
    |   1 |     a |
    |   1 |     a |
    |   2 |     b |
    |   3 |     c |
    |   3 |     c |
    |   3 |     c |
    |   4 |     d |
    |   5 |     e |
    +-----+-------+
    8 rows in set
    
    Flink SQL> QUIT;Hive Session ID = 2dadad92-436e-426e-a88c-66eafd740d98
    
    [INFO] Exiting Flink SQL CLI Client...
    
    Shutting down the session...
    done.
    root [ ~ ]# exit
    

    De gegevens worden geschreven in dezelfde geconfigureerde container in de hive/warehouse-directory.

    Schermopname van containertabel 1.

    • Het uitvoeren van de Hive-dialect in Flink met behulp van partities
  create table tblpart2 (key int, value string) PARTITIONED by ( part string ) tblproperties ('sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');

  insert into table tblpart2 Hive Session ID = 78fae85f-a451-4110-bea6-4aa1c172e282),(2,'b','d'),(3,'c','d'),(3,'c','a'),(4,'d','e');

Schermopname van containertabel 2.

Schermopname van containertabel 3.

Referentie