Dialekt Hive w klastrach Apache Flink® na platformie HDInsight na AKS
Ważny
Usługa Azure HDInsight w usłudze AKS została wycofana 31 stycznia 2025 r. Dowiedz się więcej dzięki temu ogłoszeniu.
Aby uniknąć nagłego kończenia obciążeń, należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure.
Ważny
Ta funkcja jest obecnie dostępna w wersji zapoznawczej. Dodatkowe warunki użytkowania platformy Microsoft Azure dotyczące wersji zapoznawczej obejmują dodatkowe warunki prawne, które odnoszą się do funkcji platformy Azure, które są w wersji beta, w wersji zapoznawczej lub w ogóle nie zostały jeszcze udostępnione w wersji ogólnodostępnej. Aby uzyskać informacje na temat tej konkretnej wersji zapoznawczej, zobacz informacje o wersji zapoznawczej Azure HDInsight na AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie dotyczące AskHDInsight, aby uzyskać więcej informacji na temat społeczności usługi Azure HDInsight.
Z tego artykułu dowiesz się, jak używać dialektu Hive w klastrach Apache Flink na HDInsight na AKS.
Wprowadzenie
Użytkownik nie może zmienić domyślnego dialektu flink
na dialekt hive podczas korzystania z HDInsight na klastrach AKS. Wszystkie operacje SQL nie powiodły się po zmianie na dialekt hive z następującym błędem.
*java.lang.ClassCastException: class jdk.internal.loader.ClassLoaders$AppClassLoader can't be cast to class java.net.URLClassLoader*
Przyczyną tego problemu jest otwarte Hive Jira. Obecnie program Hive zakłada, że systemowy ładowarka klas jest wystąpieniem klasy URLClassLoader. W Java 11
to założenie nie jest takie.
Jak używać dialektu Hive w Flink
Wykonaj następujące kroki w webssh:
- Usuń istniejący plik flink-sql-connector-hive*jar w lokalizacji lib
rm /opt/flink-webssh/lib/flink-sql-connector-hive*jar
- Pobierz następujący plik jar w zasobniku
webssh
i dodaj go w obszarze /opt/flink-webssh/lib wget https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader/1.17.0. (Powyższy plik jar hive ma poprawkę https://issues.apache.org/jira/browse/HIVE-27508)
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/ mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
- Dodaj następujące klucze w zarządzaniu konfiguracją
flink
w sekcji core-site.xml:fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY> flink.hadoop.fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY>
- Usuń istniejący plik flink-sql-connector-hive*jar w lokalizacji lib
Oto przegląd zapytań w dialekcie Hadoop Hive
- Wykonywanie dialektu Hive w języku Flink bez partycjonowania
root [ ~ ]# ./bin/sql-client.sh Flink SQL> Flink SQL> create catalog myhive with ('type' = 'hive', 'hive-conf-dir' = '/opt/hive-conf'); [INFO] Execute statement succeed. Flink SQL> use catalog myhive; [INFO] Execute statement succeed. Flink SQL> load module hive; [INFO] Execute statement succeed. Flink SQL> use modules hive,core; [INFO] Execute statement succeed. Flink SQL> set table.sql-dialect=hive; [INFO] Session property has been set. Flink SQL> set sql-client.execution.result-mode=tableau; [INFO] Session property has been set. Flink SQL> select explode(array(1,2,3));Hive Session ID = 6ba45be2-360e-4bee-8842-2765c91581c8 > [!WARNING] > An illegal reflective access operation has occurred > [!WARNING] > Illegal reflective access by org.apache.hadoop.hive.common.StringInternUtils (file:/opt/flink-webssh/lib/flink-sql-connector-hive-3.1.2_2.12-1.16-SNAPSHOT.jar) to field java.net.URI.string > [!WARNING] > Please consider reporting this to the maintainers of org.apache.hadoop.hive.common.StringInternUtils > [!WARNING] > `Use --illegal-access=warn` to enable warnings of further illegal reflective access operations > [!WARNING] > All illegal access operations will be denied in a future release select explode(array(1,2,3)); +----+-------------+ | op | col | +----+-------------+ | +I | 1 | | +I | 2 | | +I | 3 | +----+-------------+ Received a total of 3 rows Flink SQL> create table tttestHive Session ID = fb8b652a-8dad-4781-8384-0694dc16e837 [INFO] Execute statement succeed. Flink SQL> insert into table tttestHive Session ID = f239dc6f-4b58-49f9-ad02-4c73673737d8),(3,'c'),(4,'d'); [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: d0542da4c4252f9494298666ff4e9f8e Flink SQL> set execution.runtime-mode=batch; [INFO] Session property has been set. Flink SQL> select * from tttestHive Session ID = 61b6eb3b-90a6-499c-aced-0598366c5b31 +-----+-------+ | key | value | +-----+-------+ | 1 | a | | 1 | a | | 2 | b | | 3 | c | | 3 | c | | 3 | c | | 4 | d | | 5 | e | +-----+-------+ 8 rows in set Flink SQL> QUIT;Hive Session ID = 2dadad92-436e-426e-a88c-66eafd740d98 [INFO] Exiting Flink SQL CLI Client... Shutting down the session... done. root [ ~ ]# exit
Dane są zapisywane w tym samym kontenerze skonfigurowanym w katalogu hive/warehouse.
- Wykonywanie dialektu Hive w języku Flink z partycjami
create table tblpart2 (key int, value string) PARTITIONED by ( part string ) tblproperties ('sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
insert into table tblpart2 Hive Session ID = 78fae85f-a451-4110-bea6-4aa1c172e282),(2,'b','d'),(3,'c','d'),(3,'c','a'),(4,'d','e');
Odniesienie
- Dialekt Hive w Apache Flink
- Nazwy projektów open source Apache, Apache Flink, Flink i związane z nimi dotyczą znaków towarowychApache Software Foundation (ASF).