API de Hive Warehouse Connector 2.0 en Azure HDInsight
En este artículo se enumeran todas las API que admite Hive Warehouse Connector 2.0. Todos los ejemplos que se muestran explican cómo ejecutar usando un shell de Spark y una sesión de Hive Warehouse Connector.
Creación de una sesión de Hive Warehouse Connector:
import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()
Requisito previo
Complete los pasos de configuración de Hive Warehouse Connector.
API admitidas
Establecimiento de la base de datos
hive.setDatabase("<database-name>")
Enumeración de todas las bases de datos
hive.showDatabases()
Enumeración de todas las tablas de la base de datos actual
hive.showTables()
Descripción de una tabla
// Describes the table <table-name> in the current database hive.describeTable("<table-name>")
// Describes the table <table-name> in <database-name> hive.describeTable("<database-name>.<table-name>")
Quitar una base de datos
// ifExists and cascade are boolean variables hive.dropDatabase("<database-name>", ifExists, cascade)
Eliminación de una tabla de la base de datos actual
// ifExists and purge are boolean variables hive.dropTable("<table-name>", ifExists, purge)
Crear una base de datos
// ifNotExists is boolean variable hive.createDatabase("<database-name>", ifNotExists)
Creación de una tabla en la base de datos actual
// Returns a builder to create table val createTableBuilder = hive.createTable("<table-name>")
El generador para crear tablas solo admite las siguientes operaciones:
// Create only if table does not exists already createTableBuilder = createTableBuilder.ifNotExists()
// Add columns createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
// Add partition column createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
// Add table properties createTableBuilder = createTableBuilder.prop("<key>", "<value>")
// Creates a bucketed table, // Parameters are numOfBuckets (integer) followed by column names for bucketing createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
// Creates the table createTableBuilder.create()
Nota:
Esta API crea una tabla con formato ORC en la ubicación predeterminada. Para conocer otras características u opciones o para crear una tabla usando consultas de Hive, use la API
executeUpdate
.Lectura de una tabla
// Returns a Dataset<Row> that contains data of <table-name> in the current database hive.table("<table-name>")
Ejecución de comandos DDL en HiveServer2
// Executes the <hive-query> against HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>")
// Executes the <hive-query> against HiveServer2 // Throws exception, if propagateException is true and query threw exception in HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
Ejecución de una consulta de Hive y carga del resultado en el conjunto de datos
Ejecución de una consulta a través de demonios LLAP (opción recomendada)
// <hive-query> should be a hive query hive.executeQuery("<hive-query>")
Ejecución de una consulta a través de HiveServer2 mediante JDBC
Establecimiento de
spark.datasource.hive.warehouse.smartExecution
enfalse
en configuraciones de Spark antes de iniciar una sesión en la plataforma para usar esta APIhive.execute("<hive-query>")
Cierre de una sesión de Hive Warehouse Connector
// Closes all the open connections and // release resources/locks from HiveServer2 hive.close()
Ejecución de una consulta de combinación de Hive
Esta API crea una consulta de combinación de Hive con el formato
MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias> ON <onExpr> WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN> WHEN MATCHED [AND <deleteExpr>] THEN DELETE WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
El generador admite las siguientes operaciones:
mergeBuilder.mergeInto("<target-table>", "<targetAlias>")
mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
mergeBuilder.on("<onExpr>")
mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
// Executes the merge query mergeBuilder.merge()
Escritura de un conjunto de datos en una tabla de Hive por lotes
df.write.format("com.microsoft.hwc.v2") .option("table", tableName) .mode(SaveMode.Type) .save()
TableName debe tener el formato
<db>.<table>
o<table>
. Si no se proporciona ningún nombre de base de datos, la tabla buscará o creará en la base de datos actual.Los tipos de SaveMode son los siguientes:
Append: anexa el conjunto de datos a la tabla dada.
Overwrite: sobrescribe los datos de la tabla dada con el conjunto de datos.
Ignore: omite la escritura si la tabla ya existe, y no se produce ningún error.
ErrorIfExists: se produce un error si ya existe la tabla.
Escritura de un conjunto de datos en una tabla de Hive usando HiveStreaming
df.write.format("com.microsoft.hwc.v2.batch.stream.write") .option("database", databaseName) .option("table", tableName) .option("metastoreUri", "<HMS_URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save() // To write to static partition df.write.format("com.microsoft.hwc.v2.batch.stream.write") .option("database", databaseName) .option("table", tableName) .option("partition", partition) .option("metastoreUri", "<HMS URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save()
Nota:
Al escribir secuencias siempre se anexan datos.
Escritura de una secuencia de Spark en una tabla de Hive
stream.writeStream .format("com.microsoft.hwc.v2") .option("metastoreUri", "<HMS_URI>") .option("database", databaseName) .option("table", tableName) //.option("partition", partition) , add if inserting data in partition //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .start()