Partilhar via


APIs do Conector do Hive Warehouse no Azure HDInsight

Este artigo lista todas as APIs suportadas pelo conector do armazém do Hive. Todos os exemplos apresentados abaixo são executados com a sessão do conector spark-shell e do hive warehouse.

Como criar a sessão do conector do armazém do Hive:

import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()

Pré-requisito

Conclua os passos de configuração do Hive Warehouse Connector .

APIs suportadas

  • Defina a base de dados:

    hive.setDatabase("<database-name>")
    
  • Listar todas as bases de dados:

    hive.showDatabases()
    
  • Listar todas as tabelas na base de dados atual

    hive.showTables()
    
  • Descrever uma tabela

    // Describes the table <table-name> in the current database
    hive.describeTable("<table-name>")
    
    // Describes the table <table-name> in <database-name>
    hive.describeTable("<database-name>.<table-name>")
    
  • Remover uma base de dados

    // ifExists and cascade are boolean variables
    hive.dropDatabase("<database-name>", ifExists, cascade)
    
  • Remover uma tabela na base de dados atual

    // ifExists and purge are boolean variables
    hive.dropTable("<table-name>", ifExists, purge)
    
  • Criar uma base de dados

    // ifNotExists is boolean variable
    hive.createDatabase("<database-name>", ifNotExists)
    
  • Criar uma tabela na base de dados atual

    // Returns a builder to create table
    val createTableBuilder = hive.createTable("<table-name>")
    

    O construtor para create-table suporta apenas as operações abaixo:

    // Create only if table does not exists already
    createTableBuilder = createTableBuilder.ifNotExists()
    
    // Add columns
    createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
    
    // Add partition column
    createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
    
    // Add table properties
    createTableBuilder = createTableBuilder.prop("<key>", "<value>")
    
    // Creates a bucketed table,
    // Parameters are numOfBuckets (integer) followed by column names for bucketing
    createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
    
    // Creates the table
    createTableBuilder.create()
    

    Nota

    Esta API cria uma tabela formatada em ORC na localização predefinida. Para outras funcionalidades/opções ou para criar tabelas com consultas do Hive, utilize executeUpdate a API.

  • Ler tabelas

    // Returns a Dataset<Row> that contains data of <table-name> in the current database
    hive.table("<table-name>")
    
  • Executar comandos DDL no HiveServer2

    // Executes the <hive-query> against HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>")
    
    // Executes the <hive-query> against HiveServer2
    // Throws exception, if propagateException is true and query threw exception in HiveServer2
    // Returns true or false if the query succeeded or failed respectively
    hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
    
  • Executar a consulta e o resultado de carregamento do Hive no Conjunto de Dados

    • Executar a consulta através de daemons LLAP. [Recomendado]

      // <hive-query> should be a hive query 
      hive.executeQuery("<hive-query>")
      
    • Executar a consulta através do HiveServer2 através de JDBC.

      Defina spark.datasource.hive.warehouse.smartExecution como false em configurações do Spark antes de iniciar a sessão do Spark para utilizar esta API

      hive.execute("<hive-query>")
      
  • Fechar sessão do conector do armazém do Hive

    // Closes all the open connections and
    // release resources/locks from HiveServer2
    hive.close()
    
  • Executar a consulta de Intercalação do Hive

    Esta API cria uma consulta de intercalação do Hive com o formato abaixo

    MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias>
    ON <onExpr>
    WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN>
    WHEN MATCHED [AND <deleteExpr>] THEN DELETE
    WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
    
    val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
    

    O Builder suporta as seguintes operações:

    mergeBuilder.mergeInto("<target-table>", "<targetAlias>")
    
    mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
    
    mergeBuilder.on("<onExpr>")
    
    mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
    
    mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
    
    mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
    
    // Executes the merge query
    mergeBuilder.merge()
    
  • Escrever um Conjunto de Dados na Tabela do Hive em lote

    df.write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector")
       .option("table", tableName)
       .mode(SaveMode.Type)
       .save()
    
    • TableName deve ser de formulário <db>.<table> ou <table>. Se não for fornecido nenhum nome de base de dados, a tabela será pesquisada/criada na base de dados atual

    • Os tipos de SaveMode são:

      • Acrescentar: acrescenta o conjunto de dados à tabela especificada

      • Substituir: substitui os dados na tabela especificada pelo conjunto de dados

      • Ignorar: ignora a escrita se a tabela já existir, sem erros gerados

      • ErrorIfExists: gera um erro se a tabela já existir

  • Escrever um Conjunto de Dados na Tabela do Hive com o HiveStreaming

    df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource")
       .option("database", databaseName)
       .option("table", tableName)
       .option("metastoreUri", "<HMS_URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    
     // To write to static partition
     df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource")
       .option("database", databaseName)
       .option("table", tableName)
       .option("partition", partition)
       .option("metastoreUri", "<HMS URI>")
    // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
       .save()
    

    Nota

    As escritas do Stream acrescentam sempre dados.

  • Escrever um fluxo do Spark numa Tabela do Hive

    stream.writeStream
        .format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource")
        .option("metastoreUri", "<HMS_URI>")
        .option("database", databaseName)
        .option("table", tableName)
      //.option("partition", partition) , add if inserting data in partition
      //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster
        .start()