APIs do Hive Warehouse Connector 2.0 no Azure HDInsight
Este artigo lista todas as APIs suportadas pelo conector de armazém do Hive 2.0. Todos os exemplos mostrados são como executar usando spark-shell e hive warehouse connector session.
Como criar uma sessão do conector de armazém do Hive:
import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()
Pré-requisito
Conclua as etapas de configuração do Hive Warehouse Connector.
APIs suportadas
Defina o banco de dados:
hive.setDatabase("<database-name>")
Listar todas as bases de dados:
hive.showDatabases()
Listar todas as tabelas no banco de dados atual
hive.showTables()
Descrever uma tabela
// Describes the table <table-name> in the current database hive.describeTable("<table-name>")
// Describes the table <table-name> in <database-name> hive.describeTable("<database-name>.<table-name>")
Soltar um banco de dados
// ifExists and cascade are boolean variables hive.dropDatabase("<database-name>", ifExists, cascade)
Soltar uma tabela no banco de dados atual
// ifExists and purge are boolean variables hive.dropTable("<table-name>", ifExists, purge)
Criar uma base de dados
// ifNotExists is boolean variable hive.createDatabase("<database-name>", ifNotExists)
Criar uma tabela no banco de dados atual
// Returns a builder to create table val createTableBuilder = hive.createTable("<table-name>")
O Builder for create-table suporta apenas as operações abaixo:
// Create only if table does not exists already createTableBuilder = createTableBuilder.ifNotExists()
// Add columns createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
// Add partition column createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
// Add table properties createTableBuilder = createTableBuilder.prop("<key>", "<value>")
// Creates a bucketed table, // Parameters are numOfBuckets (integer) followed by column names for bucketing createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
// Creates the table createTableBuilder.create()
Nota
Essa API cria uma tabela formatada em ORC no local padrão. Para outros recursos/opções ou para criar tabelas usando consultas hive, use
executeUpdate
a API.Ler tabelas
// Returns a Dataset<Row> that contains data of <table-name> in the current database hive.table("<table-name>")
Executar comandos DDL no HiveServer2
// Executes the <hive-query> against HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>")
// Executes the <hive-query> against HiveServer2 // Throws exception, if propagateException is true and query threw exception in HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
Executar consulta do Hive e carregar o resultado no conjunto de dados
Execução de consulta via daemons LLAP. [Recomendado]
// <hive-query> should be a hive query hive.executeQuery("<hive-query>")
Execução de consulta através do HiveServer2 via JDBC.
Defina
spark.datasource.hive.warehouse.smartExecution
comofalse
em configurações de faísca antes de iniciar a sessão de faísca para usar essa APIhive.execute("<hive-query>")
Fechar sessão do conector do armazém do Hive
// Closes all the open connections and // release resources/locks from HiveServer2 hive.close()
Executar consulta de mesclagem do Hive
Essa API cria uma consulta de mesclagem do Hive no formato
MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias> ON <onExpr> WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN> WHEN MATCHED [AND <deleteExpr>] THEN DELETE WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
O Builder suporta as seguintes operações:
mergeBuilder.mergeInto("<target-table>", "<targetAlias>")
mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
mergeBuilder.on("<onExpr>")
mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
// Executes the merge query mergeBuilder.merge()
Gravar um conjunto de dados na tabela do Hive em lote
df.write.format("com.microsoft.hwc.v2") .option("table", tableName) .mode(SaveMode.Type) .save()
TableName deve ser de forma
<db>.<table>
ou<table>
. Se nenhum nome de banco de dados for fornecido, a tabela será pesquisada/criada no banco de dados atualOs tipos SaveMode são:
Acrescentar: Acrescenta o conjunto de dados à tabela fornecida
Substituir: substitui os dados na tabela fornecida pelo conjunto de dados
Ignorar: ignora a gravação se a tabela já existir, nenhum erro gerado
ErrorIfExists: Lança erro se a tabela já existir
Gravar um conjunto de dados na tabela do Hive usando o HiveStreaming
df.write.format("com.microsoft.hwc.v2.batch.stream.write") .option("database", databaseName) .option("table", tableName) .option("metastoreUri", "<HMS_URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save() // To write to static partition df.write.format("com.microsoft.hwc.v2.batch.stream.write") .option("database", databaseName) .option("table", tableName) .option("partition", partition) .option("metastoreUri", "<HMS URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save()
Nota
O Stream grava sempre acrescentar dados.
Escrevendo um fluxo de faísca em uma tabela Hive
stream.writeStream .format("com.microsoft.hwc.v2") .option("metastoreUri", "<HMS_URI>") .option("database", databaseName) .option("table", tableName) //.option("partition", partition) , add if inserting data in partition //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .start()