APIs do Hive Warehouse Connector 2.0 no Azure HDInsight
Este artigo lista todas as APIs com suporte do Hive Warehouse Connector 2.0. Todos os exemplos mostrados são de como executar usando o spark-shell e a sessão do conector do Hive Warehouse.
Como criar a sessão do Hive Warehouse Connector:
import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()
Pré-requisito
Conclua as etapas da instalação do Hive Warehouse Connector.
APIs com suporte
Definir o banco de dados:
hive.setDatabase("<database-name>")
Listar todos os bancos de dados:
hive.showDatabases()
Listar todas as tabelas no banco de dados atual
hive.showTables()
Descrever uma tabela
// Describes the table <table-name> in the current database hive.describeTable("<table-name>")
// Describes the table <table-name> in <database-name> hive.describeTable("<database-name>.<table-name>")
Remover um banco de dados
// ifExists and cascade are boolean variables hive.dropDatabase("<database-name>", ifExists, cascade)
Remover uma tabela do banco de dados atual
// ifExists and purge are boolean variables hive.dropTable("<table-name>", ifExists, purge)
Criar um banco de dados
// ifNotExists is boolean variable hive.createDatabase("<database-name>", ifNotExists)
Criar uma tabela no banco de dados atual
// Returns a builder to create table val createTableBuilder = hive.createTable("<table-name>")
O construtor da criação de tabelas dá suporte apenas às operações abaixo:
// Create only if table does not exists already createTableBuilder = createTableBuilder.ifNotExists()
// Add columns createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
// Add partition column createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
// Add table properties createTableBuilder = createTableBuilder.prop("<key>", "<value>")
// Creates a bucketed table, // Parameters are numOfBuckets (integer) followed by column names for bucketing createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
// Creates the table createTableBuilder.create()
Observação
Essa API cria uma tabela formatada em ORC no local padrão. Para ver outros recursos/opções ou criar uma tabela com consultas do Hive, use a API
executeUpdate
.Ler uma tabela
// Returns a Dataset<Row> that contains data of <table-name> in the current database hive.table("<table-name>")
Executar comandos DDL no HiveServer2
// Executes the <hive-query> against HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>")
// Executes the <hive-query> against HiveServer2 // Throws exception, if propagateException is true and query threw exception in HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
Executar a consulta do Hive e carregar o resultado em um conjunto de dados
Como executar consultas por meio de daemons LLAP. [Recomendado]
// <hive-query> should be a hive query hive.executeQuery("<hive-query>")
Como executar consultas com o HiveServer2 por meio do JDBC.
Definir
spark.datasource.hive.warehouse.smartExecution
comofalse
nas configurações do Spark antes de iniciar a sessão do Spark para usar essa APIhive.execute("<hive-query>")
Encerrar a sessão do Hive Warehouse Connector
// Closes all the open connections and // release resources/locks from HiveServer2 hive.close()
Executar consultas de mesclagem do Hive
Esta API cria uma consulta de mesclagem do Hive no formato
MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias> ON <onExpr> WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN> WHEN MATCHED [AND <deleteExpr>] THEN DELETE WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
O construtor dá suporte às seguintes operações:
mergeBuilder.mergeInto("<target-table>", "<targetAlias>")
mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
mergeBuilder.on("<onExpr>")
mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
// Executes the merge query mergeBuilder.merge()
Gravar um conjuntos de dados na tabela do Hive em lote
df.write.format("com.microsoft.hwc.v2") .option("table", tableName) .mode(SaveMode.Type) .save()
O TableName deve ter o formato
<db>.<table>
ou<table>
. Se nenhum nome de banco de dados for fornecido, a tabela será pesquisada/criada no banco de dados atualOs tipos de SaveMode são:
Anexar: anexa o conjuntos de dados à tabela especificada
Substituir: substitui os dados na tabela especificada por um conjuntos de dados
Ignorar: ignora a gravação caso a tabela já exista; nenhum erro é lançado
ErrorIfExists: lançará um erro se a tabela já existir
Gravar um conjunto de dados na tabela do Hive usando HiveStreaming
df.write.format("com.microsoft.hwc.v2.batch.stream.write") .option("database", databaseName) .option("table", tableName) .option("metastoreUri", "<HMS_URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save() // To write to static partition df.write.format("com.microsoft.hwc.v2.batch.stream.write") .option("database", databaseName) .option("table", tableName) .option("partition", partition) .option("metastoreUri", "<HMS URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save()
Observação
As gravações de fluxo sempre acrescentam dados.
Como gravar um fluxo do Spark em uma tabela do Hive
stream.writeStream .format("com.microsoft.hwc.v2") .option("metastoreUri", "<HMS_URI>") .option("database", databaseName) .option("table", tableName) //.option("partition", partition) , add if inserting data in partition //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .start()