API Hive Warehouse Connector 2.0 в Azure HDInsight
В этой статье перечислены все API, поддерживаемые Hive Warehouse Connector 2.0. Все приведенные примеры— запуск с помощью сеанса соединителя spark-shell и хранилища hive.
Создание сеанса Hive Warehouse Connector.
import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()
Предварительные требования
Выполните инструкции из статьи Настройка Hive Warehouse Connector.
Поддерживаемые API
Настройка базы данных:
hive.setDatabase("<database-name>")
Вывод списка всех баз данных:
hive.showDatabases()
Вывод списка всех таблиц в текущей базе данных
hive.showTables()
Описание таблицы
// Describes the table <table-name> in the current database hive.describeTable("<table-name>")
// Describes the table <table-name> in <database-name> hive.describeTable("<database-name>.<table-name>")
Удаление базы данных
// ifExists and cascade are boolean variables hive.dropDatabase("<database-name>", ifExists, cascade)
Удаление таблицы из текущей базы данных
// ifExists and purge are boolean variables hive.dropTable("<table-name>", ifExists, purge)
Создание базы данных
// ifNotExists is boolean variable hive.createDatabase("<database-name>", ifNotExists)
Создание таблицы в текущей базе данных
// Returns a builder to create table val createTableBuilder = hive.createTable("<table-name>")
Построитель для создания таблиц поддерживает только следующие операции.
// Create only if table does not exists already createTableBuilder = createTableBuilder.ifNotExists()
// Add columns createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
// Add partition column createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
// Add table properties createTableBuilder = createTableBuilder.prop("<key>", "<value>")
// Creates a bucketed table, // Parameters are numOfBuckets (integer) followed by column names for bucketing createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
// Creates the table createTableBuilder.create()
Примечание.
Этот программный интерфейс создает таблицу в формате ORC в расположении по умолчанию. Для других функций/параметров или для создания таблицы с помощью запросов Hive используйте API
executeUpdate
.Чтение из таблицы
// Returns a Dataset<Row> that contains data of <table-name> in the current database hive.table("<table-name>")
Выполнение команд DDL в HiveServer2
// Executes the <hive-query> against HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>")
// Executes the <hive-query> against HiveServer2 // Throws exception, if propagateException is true and query threw exception in HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
Выполнение запроса Hive и загрузка результата в наборе данных
Выполнение запроса с помощью управляющих программ LLAP. [Рекомендуется]
// <hive-query> should be a hive query hive.executeQuery("<hive-query>")
Выполнение запроса с помощью HiveServer2 через JDBC.
Чтобы использовать этот API, задайте для параметра
spark.datasource.hive.warehouse.smartExecution
значениеfalse
в настройках Spark, прежде чем запустить сеанс Spark.hive.execute("<hive-query>")
Закрытие сеанса Hive Warehouse Connector
// Closes all the open connections and // release resources/locks from HiveServer2 hive.close()
Выполнение запроса на слияние Hive
Этот API создает запрос слияния Hive в формате
MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias> ON <onExpr> WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN> WHEN MATCHED [AND <deleteExpr>] THEN DELETE WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
Построитель поддерживает следующие операции.
mergeBuilder.mergeInto("<target-table>", "<targetAlias>")
mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
mergeBuilder.on("<onExpr>")
mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
// Executes the merge query mergeBuilder.merge()
Запись набора данных в таблицу Hive в пакетной службе
df.write.format("com.microsoft.hwc.v2") .option("table", tableName) .mode(SaveMode.Type) .save()
Имя таблицы TableName должно иметь вид
<db>.<table>
или<table>
. Если имя базы данных не указано, поиск по таблице или создание таблицы будет выполняться в текущей базе данных.Типы SaveMode:
Добавить: добавляет набор данных в заданную таблицу.
Перезаписать: перезаписывает данные в заданной таблице с помощью набора данных.
Игнорировать: пропускает запись, если таблица уже существует; ошибка не выдается.
ErrorIfExists: выдает ошибку, если таблица уже существует.
Запись набора данных в таблицу Hive с помощью HiveStreaming
df.write.format("com.microsoft.hwc.v2.batch.stream.write") .option("database", databaseName) .option("table", tableName) .option("metastoreUri", "<HMS_URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save() // To write to static partition df.write.format("com.microsoft.hwc.v2.batch.stream.write") .option("database", databaseName) .option("table", tableName) .option("partition", partition) .option("metastoreUri", "<HMS URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save()
Примечание.
Потоковая запись всегда добавляет данные.
Запись потока Spark в таблицу Hive
stream.writeStream .format("com.microsoft.hwc.v2") .option("metastoreUri", "<HMS_URI>") .option("database", databaseName) .option("table", tableName) //.option("partition", partition) , add if inserting data in partition //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .start()
Следующие шаги
- Операции HWC и Apache Spark
- Use Interactive Query with HDInsight (Использование Interactive Query в HDInsight)
- Интеграция HWC с Apache Zeppelin
- Отправка приложений Spark с помощью программы Spark-submit