Sdílet prostřednictvím


Rozhraní API konektoru Hive Warehouse 2.0 ve službě Azure HDInsight

Tento článek obsahuje seznam všech rozhraní API podporovaných konektorem Hive Warehouse 2.0. Všechny uvedené příklady ukazují, jak spustit pomocí spark-shellu a relace konektoru hive warehouse.

Vytvoření relace konektoru skladu Hive:

import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()

Požadavek

Dokončete kroky nastavení konektoru Hive Warehouse Connector.

Podporovaná rozhraní API

  • Nastavte databázi:

    hive.setDatabase("<database-name>")
    
  • Výpis všech databází:

    hive.showDatabases()
    
  • Výpis všech tabulek v aktuální databázi

    hive.showTables()
    
  • Popis tabulky

    // Describes the table <table-name> in the current database
    hive.describeTable("<table-name>")
    
    // Describes the table <table-name> in <database-name>
    hive.describeTable("<database-name>.<table-name>")
    
  • Vyřazení databáze

    // ifExists and cascade are boolean variables
    hive.dropDatabase("<database-name>", ifExists, cascade)
    
  • Přetažení tabulky v aktuální databázi

    // ifExists and purge are boolean variables
    hive.dropTable("<table-name>", ifExists, purge)
    
  • Vytvořit databázi

    // ifNotExists is boolean variable
    hive.createDatabase("<database-name>", ifNotExists)
    
  • Vytvoření tabulky v aktuální databázi

    // Returns a builder to create table
    val createTableBuilder = hive.createTable("<table-name>")
    

    Tvůrce pro vytvoření tabulky podporuje pouze následující operace:

    // Create only if table does not exists already
    createTableBuilder = createTableBuilder.ifNotExists()
    
    // Add columns
    createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
    
    // Add partition column
    createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
    
    // Add table properties
    createTableBuilder = createTableBuilder.prop("<key>", "<value>")
    
    // Creates a bucketed table,
    // Parameters are numOfBuckets (integer) followed by column names for bucketing
    createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
    
    // Creates the table
    createTableBuilder.create()
    

    Poznámka:

    Toto rozhraní API vytvoří ve výchozím umístění tabulku ve formátu ORC. Pro další funkce nebo možnosti nebo vytvoření tabulky pomocí dotazů Hive použijte executeUpdate rozhraní API.

  • Čtení z tabulky

    // Returns a Dataset<Row> that contains data of <table-name> in the current database
    hive.table("<table-name>")
    
  • Spouštění příkazů DDL na HiveServer2

    // Executes the <hive-query> against HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>")
    
    // Executes the <hive-query> against HiveServer2
    // Throws exception, if propagateException is true and query threw exception in HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
    
  • Spuštění dotazu Hive a načtení výsledku v datové sadě

    • Provádění dotazu prostřednictvím démonů LLAP. [Doporučeno]

      // <hive-query> should be a hive query 
      hive.executeQuery("<hive-query>")
      
    • Provádění dotazu prostřednictvím HiveServer2 přes JDBC.

      Před spuštěním relace Sparku pro použití tohoto rozhraní API nastavte spark.datasource.hive.warehouse.smartExecution v false konfiguracích Sparku.

      hive.execute("<hive-query>")
      
  • Zavření relace konektoru Pro sklad Hive

    // Closes all the open connections and
    // release resources/locks from HiveServer2
    hive.close()
    
  • Spuštění dotazu Hive Merge

    Toto rozhraní API vytvoří dotaz sloučení Hive ve formátu.

    MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias>
    ON <onExpr>
    WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN>
    WHEN MATCHED [AND <deleteExpr>] THEN DELETE
    WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
    
    val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
    

    Tvůrce podporuje následující operace:

    mergeBuilder.mergeInto("<target-table>", "<targetAlias>")
    
    mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
    
    mergeBuilder.on("<onExpr>")
    
    mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
    
    mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
    
    mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
    
    // Executes the merge query
    mergeBuilder.merge()
    
  • Zápis datové sady do tabulky Hive v dávce

    df.write.format("com.microsoft.hwc.v2")
       .option("table", tableName)
       .mode(SaveMode.Type)
       .save()
    
    • TableName by měl mít tvar <db>.<table> nebo <table>. Pokud není zadaný žádný název databáze, bude tabulka prohledána nebo vytvořena v aktuální databázi.

    • Typy SaveMode jsou:

      • Připojení: Připojí datovou sadu k dané tabulce.

      • Přepsání: Přepíše data v dané tabulce datovou sadou.

      • Ignorovat: Přeskočí zápis, pokud tabulka již existuje, nevyvolá se žádná chyba.

      • ErrorIfExists: Vyvolá chybu, pokud tabulka již existuje.

  • Zápis datové sady do tabulky Hive pomocí HiveStreamingu

    df.write.format("com.microsoft.hwc.v2.batch.stream.write")
       .option("database", databaseName)
       .option("table", tableName)
       .option("metastoreUri", "<HMS_URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    
     // To write to static partition
     df.write.format("com.microsoft.hwc.v2.batch.stream.write")
       .option("database", databaseName)
       .option("table", tableName)
       .option("partition", partition)
       .option("metastoreUri", "<HMS URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    

    Poznámka:

    Stream zapisuje vždy přidávaná data.

  • Zápis streamu Sparku do tabulky Hive

    stream.writeStream
        .format("com.microsoft.hwc.v2")
        .option("metastoreUri", "<HMS_URI>")
        .option("database", databaseName)
        .option("table", tableName)
      //.option("partition", partition) , add if inserting data in partition
      //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
        .start()
    

Další kroky