Hive Warehouse Connector 2.0 APIs in Azure HDInsight
In diesem Artikel sind alle APIs aufgeführt, die von Hive Warehouse Connector 2.0 unterstützt werden. Alle gezeigten Beispiele veranschaulichen die Ausführung mit spark-shell und Hive Warehouse Connector-Sitzungen.
Gehen Sie wie folgt vor, um eine Hive Warehouse Connector-Sitzung zu erstellen:
import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()
Voraussetzung
Schließen Sie die Schritte für das Hive Warehouse Connector-Setup ab.
Unterstützte APIs
Festlegen der Datenbank:
hive.setDatabase("<database-name>")
Auflisten aller Datenbanken:
hive.showDatabases()
Auflisten aller Tabellen in der aktuellen Datenbank
hive.showTables()
Beschreiben einer Tabelle
// Describes the table <table-name> in the current database hive.describeTable("<table-name>")
// Describes the table <table-name> in <database-name> hive.describeTable("<database-name>.<table-name>")
Löschen einer Datenbank
// ifExists and cascade are boolean variables hive.dropDatabase("<database-name>", ifExists, cascade)
Auflisten einer Tabelle in der aktuellen Datenbank
// ifExists and purge are boolean variables hive.dropTable("<table-name>", ifExists, purge)
Erstellen einer Datenbank
// ifNotExists is boolean variable hive.createDatabase("<database-name>", ifNotExists)
Erstellen einer Tabelle in der aktuellen Datenbank
// Returns a builder to create table val createTableBuilder = hive.createTable("<table-name>")
Für den create-table-Generator werden nur die folgenden Vorgänge unterstützt:
// Create only if table does not exists already createTableBuilder = createTableBuilder.ifNotExists()
// Add columns createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
// Add partition column createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
// Add table properties createTableBuilder = createTableBuilder.prop("<key>", "<value>")
// Creates a bucketed table, // Parameters are numOfBuckets (integer) followed by column names for bucketing createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
// Creates the table createTableBuilder.create()
Hinweis
Bei dieser API wird eine Tabelle mit ORC-Formatierung an einem Standardspeicherort erstellt. Verwenden Sie die
executeUpdate
-API für andere Features bzw. Optionen oder zum Erstellen einer Tabelle mit Hive-Abfragen.Lesen einer Tabelle
// Returns a Dataset<Row> that contains data of <table-name> in the current database hive.table("<table-name>")
Ausführen von DDL-Befehlen auf HiveServer2
// Executes the <hive-query> against HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>")
// Executes the <hive-query> against HiveServer2 // Throws exception, if propagateException is true and query threw exception in HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
Ausführen einer Hive-Abfrage und Laden des Ergebnisses in das Dataset
Ausführen einer Abfrage über LLAP-Daemons [Empfohlen]
// <hive-query> should be a hive query hive.executeQuery("<hive-query>")
Ausführen einer Abfrage über HiveServer2 per JDBC
Legen Sie
spark.datasource.hive.warehouse.smartExecution
in der Spark-Konfiguration auffalse
fest, bevor Sie die Spark-Sitzung für die Verwendung dieser API starten.hive.execute("<hive-query>")
Schließen der Hive Warehouse Connector-Sitzung
// Closes all the open connections and // release resources/locks from HiveServer2 hive.close()
Ausführen einer Hive-MERGE-Abfrage
Diese API erstellt eine Hive-MERGE-Abfrage im folgenden Format:
MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias> ON <onExpr> WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN> WHEN MATCHED [AND <deleteExpr>] THEN DELETE WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
Der Generator unterstützt die folgenden Vorgänge:
mergeBuilder.mergeInto("<target-table>", "<targetAlias>")
mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
mergeBuilder.on("<onExpr>")
mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
// Executes the merge query mergeBuilder.merge()
Schreiben eines Datasets in eine Hive-Tabelle per Batchvorgang
df.write.format("com.microsoft.hwc.v2") .option("table", tableName) .mode(SaveMode.Type) .save()
„TableName“ sollte das Format
<db>.<table>
oder<table>
haben. Wenn kein Datenbankname angegeben wird, wird die Suche bzw. Erstellung der Tabelle in der aktuellen Datenbank durchgeführt.Die SaveMode-Typen lauten:
Append: Fügt das Dataset an die jeweilige Tabelle an.
Overwrite: Überschreibt die Daten in der jeweiligen Tabelle durch das Dataset.
Ignore: Überspringt den Schreibvorgang ohne Fehlerauslösung, falls die Tabelle bereits vorhanden ist.
ErrorIfExists: Löst einen Fehler aus, falls die Tabelle bereits vorhanden ist.
Schreiben eines Datasets in eine Hive-Tabelle per HiveStreaming
df.write.format("com.microsoft.hwc.v2.batch.stream.write") .option("database", databaseName) .option("table", tableName) .option("metastoreUri", "<HMS_URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save() // To write to static partition df.write.format("com.microsoft.hwc.v2.batch.stream.write") .option("database", databaseName) .option("table", tableName) .option("partition", partition) .option("metastoreUri", "<HMS URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save()
Hinweis
Bei Streamschreibvorgängen werden immer Daten angefügt.
Schreiben eines Spark-Streams in eine Hive-Tabelle
stream.writeStream .format("com.microsoft.hwc.v2") .option("metastoreUri", "<HMS_URI>") .option("database", databaseName) .option("table", tableName) //.option("partition", partition) , add if inserting data in partition //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .start()