Hive dialect in Apache Flink-clusters® in HDInsight op AKS
Belangrijk
Azure HDInsight op AKS is op 31 januari 2025 buiten gebruik gesteld. Leer meer over uit deze aankondiging.
U moet uw workloads migreren naar Microsoft Fabric- of een gelijkwaardig Azure-product om plotselinge beëindiging van uw workloads te voorkomen.
Belangrijk
Deze functie is momenteel beschikbaar als preview-versie. De aanvullende gebruiksvoorwaarden voor Microsoft Azure Previews meer juridische voorwaarden bevatten die van toepassing zijn op Azure-functies die bèta, in preview of anderszins nog niet in algemene beschikbaarheid zijn vrijgegeven. Zie Azure HDInsight in AKS preview-informatievoor meer informatie over deze specifieke preview. Voor vragen of suggesties voor functies dient u een aanvraag in op AskHDInsight- met de details en volgt u ons voor meer updates over Azure HDInsight Community-.
In dit artikel leert u hoe u Hive-dialect gebruikt in Apache Flink-clusters in HDInsight op AKS.
Introductie
De gebruiker kan het standaard-flink
dialect niet wijzigen in hive dialect voor hun gebruik in HDInsight op AKS-clusters. Alle SQL-bewerkingen mislukken zodra ze zijn gewijzigd in hive dialect met de volgende fout.
*java.lang.ClassCastException: class jdk.internal.loader.ClassLoaders$AppClassLoader can't be cast to class java.net.URLClassLoader*
De reden voor dit probleem is het gevolg van een open Hive Jira. Momenteel gaat Hive ervan uit dat het systeemklasselaadprogramma een exemplaar van URLClassLoader is. In Java 11
is deze aanname niet het geval.
Hive dialect gebruiken in Flink
Voer de volgende stappen uit in webssh-:
- Verwijder de bestaande flink-sql-connector-hive*jar in lib-locatie
rm /opt/flink-webssh/lib/flink-sql-connector-hive*jar
- Download de volgende jar in
webssh
pod en voeg het toe onder de /opt/flink-webssh/lib wget https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader/1.17.0. (De bovenstaande Hive Jar heeft de fix https://issues.apache.org/jira/browse/HIVE-27508)
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/ mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
- Voeg de volgende sleutels toe in het
flink
configuratiebeheer onder core-site.xml sectie:fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY> flink.hadoop.fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY>
- Verwijder de bestaande flink-sql-connector-hive*jar in lib-locatie
Hier volgt een overzicht van hive-dialectqueries
- Hive dialect uitvoeren in Flink zonder partitionering
root [ ~ ]# ./bin/sql-client.sh Flink SQL> Flink SQL> create catalog myhive with ('type' = 'hive', 'hive-conf-dir' = '/opt/hive-conf'); [INFO] Execute statement succeed. Flink SQL> use catalog myhive; [INFO] Execute statement succeed. Flink SQL> load module hive; [INFO] Execute statement succeed. Flink SQL> use modules hive,core; [INFO] Execute statement succeed. Flink SQL> set table.sql-dialect=hive; [INFO] Session property has been set. Flink SQL> set sql-client.execution.result-mode=tableau; [INFO] Session property has been set. Flink SQL> select explode(array(1,2,3));Hive Session ID = 6ba45be2-360e-4bee-8842-2765c91581c8 > [!WARNING] > An illegal reflective access operation has occurred > [!WARNING] > Illegal reflective access by org.apache.hadoop.hive.common.StringInternUtils (file:/opt/flink-webssh/lib/flink-sql-connector-hive-3.1.2_2.12-1.16-SNAPSHOT.jar) to field java.net.URI.string > [!WARNING] > Please consider reporting this to the maintainers of org.apache.hadoop.hive.common.StringInternUtils > [!WARNING] > `Use --illegal-access=warn` to enable warnings of further illegal reflective access operations > [!WARNING] > All illegal access operations will be denied in a future release select explode(array(1,2,3)); +----+-------------+ | op | col | +----+-------------+ | +I | 1 | | +I | 2 | | +I | 3 | +----+-------------+ Received a total of 3 rows Flink SQL> create table tttestHive Session ID = fb8b652a-8dad-4781-8384-0694dc16e837 [INFO] Execute statement succeed. Flink SQL> insert into table tttestHive Session ID = f239dc6f-4b58-49f9-ad02-4c73673737d8),(3,'c'),(4,'d'); [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: d0542da4c4252f9494298666ff4e9f8e Flink SQL> set execution.runtime-mode=batch; [INFO] Session property has been set. Flink SQL> select * from tttestHive Session ID = 61b6eb3b-90a6-499c-aced-0598366c5b31 +-----+-------+ | key | value | +-----+-------+ | 1 | a | | 1 | a | | 2 | b | | 3 | c | | 3 | c | | 3 | c | | 4 | d | | 5 | e | +-----+-------+ 8 rows in set Flink SQL> QUIT;Hive Session ID = 2dadad92-436e-426e-a88c-66eafd740d98 [INFO] Exiting Flink SQL CLI Client... Shutting down the session... done. root [ ~ ]# exit
De gegevens worden geschreven in dezelfde geconfigureerde container in de hive/warehouse-directory.
- Het uitvoeren van de Hive-dialect in Flink met behulp van partities
create table tblpart2 (key int, value string) PARTITIONED by ( part string ) tblproperties ('sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
insert into table tblpart2 Hive Session ID = 78fae85f-a451-4110-bea6-4aa1c172e282),(2,'b','d'),(3,'c','d'),(3,'c','a'),(4,'d','e');
Referentie
- Hive Dialect in Apache Flink
- Apache, Apache Flink, Flink en bijbehorende opensource-projectnamen zijn handelsmerken van de Apache Software Foundation (ASF).