API del connettore Hive Warehouse in Azure HDInsight
Questo articolo elenca tutte le API supportate dal connettore hive warehouse. Tutti gli esempi illustrati di seguito vengono eseguiti usando la sessione del connettore spark-shell e hive warehouse.
Come creare una sessione del connettore Hive warehouse:
import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()
Prerequisito
Completare la procedura di configurazione di Hive Warehouse Connector.
API supportate
Impostare il database:
hive.setDatabase("<database-name>")
Elencare tutti i database:
hive.showDatabases()
Elencare tutte le tabelle nel database corrente
hive.showTables()
Descrivere una tabella
// Describes the table <table-name> in the current database hive.describeTable("<table-name>")
// Describes the table <table-name> in <database-name> hive.describeTable("<database-name>.<table-name>")
Eliminare un database
// ifExists and cascade are boolean variables hive.dropDatabase("<database-name>", ifExists, cascade)
Eliminare una tabella nel database corrente
// ifExists and purge are boolean variables hive.dropTable("<table-name>", ifExists, purge)
Creazione di un database
// ifNotExists is boolean variable hive.createDatabase("<database-name>", ifNotExists)
Creare una tabella nel database corrente
// Returns a builder to create table val createTableBuilder = hive.createTable("<table-name>")
Generatore per create-table supporta solo le operazioni seguenti:
// Create only if table does not exists already createTableBuilder = createTableBuilder.ifNotExists()
// Add columns createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
// Add partition column createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
// Add table properties createTableBuilder = createTableBuilder.prop("<key>", "<value>")
// Creates a bucketed table, // Parameters are numOfBuckets (integer) followed by column names for bucketing createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
// Creates the table createTableBuilder.create()
Nota
Questa API crea una tabella formattata ORC nella posizione predefinita. Per altre funzionalità/opzioni o per creare una tabella usando query hive, usare l'API
executeUpdate
.Leggere una tabella
// Returns a Dataset<Row> that contains data of <table-name> in the current database hive.table("<table-name>")
Eseguire comandi DDL in HiveServer2
// Executes the <hive-query> against HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>")
// Executes the <hive-query> against HiveServer2 // Throws exception, if propagateException is true and query threw exception in HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
Eseguire query Hive e il risultato del caricamento nel set di dati
Esecuzione di query tramite daemon LLAP. [Consigliato]
// <hive-query> should be a hive query hive.executeQuery("<hive-query>")
Esecuzione di query tramite HiveServer2 tramite JDBC.
Impostare
spark.datasource.hive.warehouse.smartExecution
sufalse
in configurazioni spark prima di avviare la sessione spark per usare questa APIhive.execute("<hive-query>")
Chiudere la sessione del connettore Hive warehouse
// Closes all the open connections and // release resources/locks from HiveServer2 hive.close()
Eseguire query di merge Hive
Questa API crea una query di merge Hive del formato seguente
MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias> ON <onExpr> WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN> WHEN MATCHED [AND <deleteExpr>] THEN DELETE WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
Builder supporta le operazioni seguenti:
mergeBuilder.mergeInto("<target-table>", "<targetAlias>")
mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
mergeBuilder.on("<onExpr>")
mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
// Executes the merge query mergeBuilder.merge()
Scrivere un set di dati in una tabella Hive in batch
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector") .option("table", tableName) .mode(SaveMode.Type) .save()
TableName deve essere di formato
<db>.<table>
o<table>
. Se non viene specificato alcun nome di database, la tabella verrà eseguita la ricerca/creazione nel database correnteI tipi SaveMode sono:
Accodamento: aggiunge il set di dati alla tabella specificata
Sovrascrivere: sovrascrive i dati nella tabella specificata con set di dati
Ignora: ignora la scrittura se la tabella esiste già, nessun errore generato
ErrorIfExists: genera un errore se la tabella esiste già
Scrivere un set di dati in una tabella Hive usando HiveStreaming
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource") .option("database", databaseName) .option("table", tableName) .option("metastoreUri", "<HMS_URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save() // To write to static partition df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource") .option("database", databaseName) .option("table", tableName) .option("partition", partition) .option("metastoreUri", "<HMS URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save()
Nota
I flussi scrivono sempre dati di accodamento.
Scrittura di un flusso spark in una tabella Hive
stream.writeStream .format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource") .option("metastoreUri", "<HMS_URI>") .option("database", databaseName) .option("table", tableName) //.option("partition", partition) , add if inserting data in partition //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .start()