แก้ไข

แชร์ผ่าน


Hive Warehouse Connector 2.0 APIs in Azure HDInsight

This article lists all the APIs supported by Hive warehouse connector 2.0. All the examples shown are how to run using spark-shell and hive warehouse connector session.

How to create Hive warehouse connector session:

import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()

Prerequisite

Complete the Hive Warehouse Connector setup steps.

Supported APIs

  • Set the database:

    hive.setDatabase("<database-name>")
    
  • List all databases:

    hive.showDatabases()
    
  • List all tables in the current database

    hive.showTables()
    
  • Describe a table

    // Describes the table <table-name> in the current database
    hive.describeTable("<table-name>")
    
    // Describes the table <table-name> in <database-name>
    hive.describeTable("<database-name>.<table-name>")
    
  • Drop a database

    // ifExists and cascade are boolean variables
    hive.dropDatabase("<database-name>", ifExists, cascade)
    
  • Drop a table in the current database

    // ifExists and purge are boolean variables
    hive.dropTable("<table-name>", ifExists, purge)
    
  • Create a database

    // ifNotExists is boolean variable
    hive.createDatabase("<database-name>", ifNotExists)
    
  • Create a table in current database

    // Returns a builder to create table
    val createTableBuilder = hive.createTable("<table-name>")
    

    Builder for create-table supports only the below operations:

    // Create only if table does not exists already
    createTableBuilder = createTableBuilder.ifNotExists()
    
    // Add columns
    createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
    
    // Add partition column
    createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
    
    // Add table properties
    createTableBuilder = createTableBuilder.prop("<key>", "<value>")
    
    // Creates a bucketed table,
    // Parameters are numOfBuckets (integer) followed by column names for bucketing
    createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
    
    // Creates the table
    createTableBuilder.create()
    

    Note

    This API creates an ORC formatted table at default location. For other features/options or to create table using hive queries, use executeUpdate API.

  • Read a table

    // Returns a Dataset<Row> that contains data of <table-name> in the current database
    hive.table("<table-name>")
    
  • Execute DDL commands on HiveServer2

    // Executes the <hive-query> against HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>")
    
    // Executes the <hive-query> against HiveServer2
    // Throws exception, if propagateException is true and query threw exception in HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
    
  • Execute Hive query and load result in Dataset

    • Executing query via LLAP daemons. [Recommended]

      // <hive-query> should be a hive query 
      hive.executeQuery("<hive-query>")
      
    • Executing query through HiveServer2 via JDBC.

      Set spark.datasource.hive.warehouse.smartExecution to false in spark configs before starting spark session to use this API

      hive.execute("<hive-query>")
      
  • Close Hive warehouse connector session

    // Closes all the open connections and
    // release resources/locks from HiveServer2
    hive.close()
    
  • Execute Hive Merge query

    This API creates a Hive merge query in the format

    MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias>
    ON <onExpr>
    WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN>
    WHEN MATCHED [AND <deleteExpr>] THEN DELETE
    WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
    
    val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
    

    Builder supports the following operations:

    mergeBuilder.mergeInto("<target-table>", "<targetAlias>")
    
    mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
    
    mergeBuilder.on("<onExpr>")
    
    mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
    
    mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
    
    mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
    
    // Executes the merge query
    mergeBuilder.merge()
    
  • Write a Dataset to Hive Table in batch

    df.write.format("com.microsoft.hwc.v2")
       .option("table", tableName)
       .mode(SaveMode.Type)
       .save()
    
    • TableName should be of form <db>.<table> or <table>. If no database name is provided, the table will searched/created in the current database

    • SaveMode types are:

      • Append: Appends the dataset to the given table

      • Overwrite: Overwrites the data in the given table with dataset

      • Ignore: Skips write if table already exists, no error thrown

      • ErrorIfExists: Throws error if table already exists

  • Write a Dataset to Hive Table using HiveStreaming

    df.write.format("com.microsoft.hwc.v2.batch.stream.write")
       .option("database", databaseName)
       .option("table", tableName)
       .option("metastoreUri", "<HMS_URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    
     // To write to static partition
     df.write.format("com.microsoft.hwc.v2.batch.stream.write")
       .option("database", databaseName)
       .option("table", tableName)
       .option("partition", partition)
       .option("metastoreUri", "<HMS URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    

    Note

    Stream writes always append data.

  • Writing a spark stream to a Hive Table

    stream.writeStream
        .format("com.microsoft.hwc.v2")
        .option("metastoreUri", "<HMS_URI>")
        .option("database", databaseName)
        .option("table", tableName)
      //.option("partition", partition) , add if inserting data in partition
      //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
        .start()
    

Next steps