Condividi tramite


API del connettore Hive Warehouse in Azure HDInsight

Questo articolo elenca tutte le API supportate dal connettore hive warehouse. Tutti gli esempi illustrati di seguito vengono eseguiti usando la sessione del connettore spark-shell e hive warehouse.

Come creare una sessione del connettore Hive warehouse:

import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()

Prerequisito

Completare la procedura di configurazione di Hive Warehouse Connector.

API supportate

  • Impostare il database:

    hive.setDatabase("<database-name>")
    
  • Elencare tutti i database:

    hive.showDatabases()
    
  • Elencare tutte le tabelle nel database corrente

    hive.showTables()
    
  • Descrivere una tabella

    // Describes the table <table-name> in the current database
    hive.describeTable("<table-name>")
    
    // Describes the table <table-name> in <database-name>
    hive.describeTable("<database-name>.<table-name>")
    
  • Eliminare un database

    // ifExists and cascade are boolean variables
    hive.dropDatabase("<database-name>", ifExists, cascade)
    
  • Eliminare una tabella nel database corrente

    // ifExists and purge are boolean variables
    hive.dropTable("<table-name>", ifExists, purge)
    
  • Creazione di un database

    // ifNotExists is boolean variable
    hive.createDatabase("<database-name>", ifNotExists)
    
  • Creare una tabella nel database corrente

    // Returns a builder to create table
    val createTableBuilder = hive.createTable("<table-name>")
    

    Generatore per create-table supporta solo le operazioni seguenti:

    // Create only if table does not exists already
    createTableBuilder = createTableBuilder.ifNotExists()
    
    // Add columns
    createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
    
    // Add partition column
    createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
    
    // Add table properties
    createTableBuilder = createTableBuilder.prop("<key>", "<value>")
    
    // Creates a bucketed table,
    // Parameters are numOfBuckets (integer) followed by column names for bucketing
    createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
    
    // Creates the table
    createTableBuilder.create()
    

    Nota

    Questa API crea una tabella formattata ORC nella posizione predefinita. Per altre funzionalità/opzioni o per creare una tabella usando query hive, usare l'API executeUpdate .

  • Leggere una tabella

    // Returns a Dataset<Row> that contains data of <table-name> in the current database
    hive.table("<table-name>")
    
  • Eseguire comandi DDL in HiveServer2

    // Executes the <hive-query> against HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>")
    
    // Executes the <hive-query> against HiveServer2
    // Throws exception, if propagateException is true and query threw exception in HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
    
  • Eseguire query Hive e il risultato del caricamento nel set di dati

    • Esecuzione di query tramite daemon LLAP. [Consigliato]

      // <hive-query> should be a hive query 
      hive.executeQuery("<hive-query>")
      
    • Esecuzione di query tramite HiveServer2 tramite JDBC.

      Impostare spark.datasource.hive.warehouse.smartExecution su false in configurazioni spark prima di avviare la sessione spark per usare questa API

      hive.execute("<hive-query>")
      
  • Chiudere la sessione del connettore Hive warehouse

    // Closes all the open connections and
    // release resources/locks from HiveServer2
    hive.close()
    
  • Eseguire query di merge Hive

    Questa API crea una query di merge Hive del formato seguente

    MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias>
    ON <onExpr>
    WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN>
    WHEN MATCHED [AND <deleteExpr>] THEN DELETE
    WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
    
    val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
    

    Builder supporta le operazioni seguenti:

    mergeBuilder.mergeInto("<target-table>", "<targetAlias>")
    
    mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
    
    mergeBuilder.on("<onExpr>")
    
    mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
    
    mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
    
    mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
    
    // Executes the merge query
    mergeBuilder.merge()
    
  • Scrivere un set di dati in una tabella Hive in batch

    df.write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector")
       .option("table", tableName)
       .mode(SaveMode.Type)
       .save()
    
    • TableName deve essere di formato <db>.<table> o <table>. Se non viene specificato alcun nome di database, la tabella verrà eseguita la ricerca/creazione nel database corrente

    • I tipi SaveMode sono:

      • Accodamento: aggiunge il set di dati alla tabella specificata

      • Sovrascrivere: sovrascrive i dati nella tabella specificata con set di dati

      • Ignora: ignora la scrittura se la tabella esiste già, nessun errore generato

      • ErrorIfExists: genera un errore se la tabella esiste già

  • Scrivere un set di dati in una tabella Hive usando HiveStreaming

    df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource")
       .option("database", databaseName)
       .option("table", tableName)
       .option("metastoreUri", "<HMS_URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    
     // To write to static partition
     df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource")
       .option("database", databaseName)
       .option("table", tableName)
       .option("partition", partition)
       .option("metastoreUri", "<HMS URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    

    Nota

    I flussi scrivono sempre dati di accodamento.

  • Scrittura di un flusso spark in una tabella Hive

    stream.writeStream
        .format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource")
        .option("metastoreUri", "<HMS_URI>")
        .option("database", databaseName)
        .option("table", tableName)
      //.option("partition", partition) , add if inserting data in partition
      //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
        .start()