Partilhar via


APIs do Hive Warehouse Connector 2.0 no Azure HDInsight

Este artigo lista todas as APIs suportadas pelo conector de armazém do Hive 2.0. Todos os exemplos mostrados são como executar usando spark-shell e hive warehouse connector session.

Como criar uma sessão do conector de armazém do Hive:

import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()

Pré-requisito

Conclua as etapas de configuração do Hive Warehouse Connector.

APIs suportadas

  • Defina o banco de dados:

    hive.setDatabase("<database-name>")
    
  • Listar todas as bases de dados:

    hive.showDatabases()
    
  • Listar todas as tabelas no banco de dados atual

    hive.showTables()
    
  • Descrever uma tabela

    // Describes the table <table-name> in the current database
    hive.describeTable("<table-name>")
    
    // Describes the table <table-name> in <database-name>
    hive.describeTable("<database-name>.<table-name>")
    
  • Soltar um banco de dados

    // ifExists and cascade are boolean variables
    hive.dropDatabase("<database-name>", ifExists, cascade)
    
  • Soltar uma tabela no banco de dados atual

    // ifExists and purge are boolean variables
    hive.dropTable("<table-name>", ifExists, purge)
    
  • Criar uma base de dados

    // ifNotExists is boolean variable
    hive.createDatabase("<database-name>", ifNotExists)
    
  • Criar uma tabela no banco de dados atual

    // Returns a builder to create table
    val createTableBuilder = hive.createTable("<table-name>")
    

    O Builder for create-table suporta apenas as operações abaixo:

    // Create only if table does not exists already
    createTableBuilder = createTableBuilder.ifNotExists()
    
    // Add columns
    createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
    
    // Add partition column
    createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
    
    // Add table properties
    createTableBuilder = createTableBuilder.prop("<key>", "<value>")
    
    // Creates a bucketed table,
    // Parameters are numOfBuckets (integer) followed by column names for bucketing
    createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
    
    // Creates the table
    createTableBuilder.create()
    

    Nota

    Essa API cria uma tabela formatada em ORC no local padrão. Para outros recursos/opções ou para criar tabelas usando consultas hive, use executeUpdate a API.

  • Ler tabelas

    // Returns a Dataset<Row> that contains data of <table-name> in the current database
    hive.table("<table-name>")
    
  • Executar comandos DDL no HiveServer2

    // Executes the <hive-query> against HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>")
    
    // Executes the <hive-query> against HiveServer2
    // Throws exception, if propagateException is true and query threw exception in HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
    
  • Executar consulta do Hive e carregar o resultado no conjunto de dados

    • Execução de consulta via daemons LLAP. [Recomendado]

      // <hive-query> should be a hive query 
      hive.executeQuery("<hive-query>")
      
    • Execução de consulta através do HiveServer2 via JDBC.

      Defina spark.datasource.hive.warehouse.smartExecution como false em configurações de faísca antes de iniciar a sessão de faísca para usar essa API

      hive.execute("<hive-query>")
      
  • Fechar sessão do conector do armazém do Hive

    // Closes all the open connections and
    // release resources/locks from HiveServer2
    hive.close()
    
  • Executar consulta de mesclagem do Hive

    Essa API cria uma consulta de mesclagem do Hive no formato

    MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias>
    ON <onExpr>
    WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN>
    WHEN MATCHED [AND <deleteExpr>] THEN DELETE
    WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
    
    val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
    

    O Builder suporta as seguintes operações:

    mergeBuilder.mergeInto("<target-table>", "<targetAlias>")
    
    mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
    
    mergeBuilder.on("<onExpr>")
    
    mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
    
    mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
    
    mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
    
    // Executes the merge query
    mergeBuilder.merge()
    
  • Gravar um conjunto de dados na tabela do Hive em lote

    df.write.format("com.microsoft.hwc.v2")
       .option("table", tableName)
       .mode(SaveMode.Type)
       .save()
    
    • TableName deve ser de forma <db>.<table> ou <table>. Se nenhum nome de banco de dados for fornecido, a tabela será pesquisada/criada no banco de dados atual

    • Os tipos SaveMode são:

      • Acrescentar: Acrescenta o conjunto de dados à tabela fornecida

      • Substituir: substitui os dados na tabela fornecida pelo conjunto de dados

      • Ignorar: ignora a gravação se a tabela já existir, nenhum erro gerado

      • ErrorIfExists: Lança erro se a tabela já existir

  • Gravar um conjunto de dados na tabela do Hive usando o HiveStreaming

    df.write.format("com.microsoft.hwc.v2.batch.stream.write")
       .option("database", databaseName)
       .option("table", tableName)
       .option("metastoreUri", "<HMS_URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    
     // To write to static partition
     df.write.format("com.microsoft.hwc.v2.batch.stream.write")
       .option("database", databaseName)
       .option("table", tableName)
       .option("partition", partition)
       .option("metastoreUri", "<HMS URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    

    Nota

    O Stream grava sempre acrescentar dados.

  • Escrevendo um fluxo de faísca em uma tabela Hive

    stream.writeStream
        .format("com.microsoft.hwc.v2")
        .option("metastoreUri", "<HMS_URI>")
        .option("database", databaseName)
        .option("table", tableName)
      //.option("partition", partition) , add if inserting data in partition
      //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
        .start()
    

Próximos passos