Azure HDInsight 内の Hive Warehouse Connector API
この記事では、Hive Warehouse Connector でサポートされているすべての API の一覧を示します。 以下に示すすべての例は、spark-shell と Hive Warehouse Connector セッションを使用して実行されます。
Hive Warehouse Connector セッションを作成する方法:
import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()
前提条件
Hive Warehouse Connector の設定ステップを完了します。
サポート対象 API
データベースを設定する:
hive.setDatabase("<database-name>")
すべてのデータベースを一覧表示する:
hive.showDatabases()
現在のデータベース内のすべてのテーブルを一覧表示する
hive.showTables()
テーブルについて説明する
// Describes the table <table-name> in the current database hive.describeTable("<table-name>")
// Describes the table <table-name> in <database-name> hive.describeTable("<database-name>.<table-name>")
データベースの削除
// ifExists and cascade are boolean variables hive.dropDatabase("<database-name>", ifExists, cascade)
現在のデータベース内のテーブルを削除する
// ifExists and purge are boolean variables hive.dropTable("<table-name>", ifExists, purge)
データベースを作成する
// ifNotExists is boolean variable hive.createDatabase("<database-name>", ifNotExists)
現在のデータベース内にテーブルを作成する
// Returns a builder to create table val createTableBuilder = hive.createTable("<table-name>")
create-table のビルダーでは、次の操作だけがサポートされます。
// Create only if table does not exists already createTableBuilder = createTableBuilder.ifNotExists()
// Add columns createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
// Add partition column createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
// Add table properties createTableBuilder = createTableBuilder.prop("<key>", "<value>")
// Creates a bucketed table, // Parameters are numOfBuckets (integer) followed by column names for bucketing createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
// Creates the table createTableBuilder.create()
注意
この API により、既定の場所に ORC 形式のテーブルが作成されます。 その他の機能やオプションの場合、または Hive クエリを使用してテーブルを作成するには、
executeUpdate
API を使用します。テーブルの読み取り
// Returns a Dataset<Row> that contains data of <table-name> in the current database hive.table("<table-name>")
HiveServer2 に対して DDL コマンドを実行する
// Executes the <hive-query> against HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>")
// Executes the <hive-query> against HiveServer2 // Throws exception, if propagateException is true and query threw exception in HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
Hive クエリを実行し、データセットに結果を読み込む
LLAP デーモンを介してクエリを実行する。 [推奨]
// <hive-query> should be a hive query hive.executeQuery("<hive-query>")
JDBC を介して HiveServer2 を通じてクエリを実行する。
Spark セッションを開始してこの API を使用する前に、Spark 構成で
spark.datasource.hive.warehouse.smartExecution
をfalse
に設定しますhive.execute("<hive-query>")
Hive Warehouse Connector セッションを終了する
// Closes all the open connections and // release resources/locks from HiveServer2 hive.close()
Hive マージ クエリを実行する
この API では、次の形式の Hive マージ クエリが作成されます
MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias> ON <onExpr> WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN> WHEN MATCHED [AND <deleteExpr>] THEN DELETE WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
ビルダーでは次の操作がサポートされます。
mergeBuilder.mergeInto("<target-table>", "<targetAlias>")
mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
mergeBuilder.on("<onExpr>")
mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
// Executes the merge query mergeBuilder.merge()
データセットを Hive テーブルにバッチで書き込む
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector") .option("table", tableName) .mode(SaveMode.Type) .save()
TableName は
<db>.<table>
または<table>
の形式である必要があります。 データベース名が指定されていない場合、テーブルは現在のデータベース内で検索または作成されますSaveMode の種類は次のとおりです。
追加: 指定されたテーブルにデータセットを追加します
上書き: 指定されたテーブル内のデータをデータセットで上書きします
無視: テーブルが既に存在する場合は書き込みをスキップし、エラーはスローしません
ErrorIfExists: テーブルが既に存在する場合にエラーをスローします
HiveStreaming を使用して Hive テーブルにデータセットを書き込む
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource") .option("database", databaseName) .option("table", tableName) .option("metastoreUri", "<HMS_URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save() // To write to static partition df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource") .option("database", databaseName) .option("table", tableName) .option("partition", partition) .option("metastoreUri", "<HMS URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save()
注意
ストリーム書き込みでは常にデータが追加されます。
Hive テーブルへの Spark ストリームの書き込み
stream.writeStream .format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource") .option("metastoreUri", "<HMS_URI>") .option("database", databaseName) .option("table", tableName) //.option("partition", partition) , add if inserting data in partition //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .start()