Azure HDInsight의 Hive Warehouse Connector API
이 문서에는 Hive Warehouse Connector에서 지원하는 모든 API가 나와 있습니다. 아래에 표시된 모든 예제는 spark-shell 및 Hive Warehouse Connector 세션을 사용하여 실행됩니다.
Hive Warehouse Connector 세션을 만드는 방법:
import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()
전제 조건
Hive Warehouse Connector 설정 단계를 완료합니다.
지원되는 API
데이터베이스 설정:
hive.setDatabase("<database-name>")
모든 데이터베이스 나열:
hive.showDatabases()
현재 데이터베이스에 있는 모든 테이블 나열
hive.showTables()
테이블 설명
// Describes the table <table-name> in the current database hive.describeTable("<table-name>")
// Describes the table <table-name> in <database-name> hive.describeTable("<database-name>.<table-name>")
데이터베이스 삭제
// ifExists and cascade are boolean variables hive.dropDatabase("<database-name>", ifExists, cascade)
현재 데이터베이스에서 테이블 삭제
// ifExists and purge are boolean variables hive.dropTable("<table-name>", ifExists, purge)
데이터베이스 만들기
// ifNotExists is boolean variable hive.createDatabase("<database-name>", ifNotExists)
현재 데이터베이스에서 테이블 만들기
// Returns a builder to create table val createTableBuilder = hive.createTable("<table-name>")
테이블 생성용 작성기는 아래 작업만 지원합니다.
// Create only if table does not exists already createTableBuilder = createTableBuilder.ifNotExists()
// Add columns createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
// Add partition column createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
// Add table properties createTableBuilder = createTableBuilder.prop("<key>", "<value>")
// Creates a bucketed table, // Parameters are numOfBuckets (integer) followed by column names for bucketing createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
// Creates the table createTableBuilder.create()
참고 항목
이 API는 기본 위치에 ORC 형식의 테이블을 만듭니다. 다른 기능/옵션을 사용하거나 Hive 쿼리를 사용하여 테이블을 만들려면
executeUpdate
API를 사용합니다.테이블 읽기
// Returns a Dataset<Row> that contains data of <table-name> in the current database hive.table("<table-name>")
HiveServer2에서 DDL 명령 실행
// Executes the <hive-query> against HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>")
// Executes the <hive-query> against HiveServer2 // Throws exception, if propagateException is true and query threw exception in HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
데이터 세트에서 Hive 쿼리 실행 및 결과 로드
LLAP 디먼을 통해 쿼리 실행 [권장]
// <hive-query> should be a hive query hive.executeQuery("<hive-query>")
JDBC를 통해 HiveServer2에서 쿼리 실행
이 API를 사용하려면 Spark 세션을 시작하기 전에 Spark 구성에서
spark.datasource.hive.warehouse.smartExecution
을false
로 설정합니다.hive.execute("<hive-query>")
Hive Warehouse Connector 세션 닫기
// Closes all the open connections and // release resources/locks from HiveServer2 hive.close()
Hive 병합 쿼리 실행
이 API는 아래 형식의 Hive 병합 쿼리를 만듭니다.
MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias> ON <onExpr> WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN> WHEN MATCHED [AND <deleteExpr>] THEN DELETE WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
작성기는 다음 작업을 지원합니다.
mergeBuilder.mergeInto("<target-table>", "<targetAlias>")
mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
mergeBuilder.on("<onExpr>")
mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
// Executes the merge query mergeBuilder.merge()
일괄 처리로 Hive 테이블에 데이터 세트 쓰기
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector") .option("table", tableName) .mode(SaveMode.Type) .save()
TableName은
<db>.<table>
또는<table>
형식이어야 합니다. 데이터베이스 이름을 제공하지 않으면 현재 데이터베이스에서 테이블이 검색/생성됩니다.SaveMode 유형은 다음과 같습니다.
Append: 지정된 테이블에 데이터 세트를 추가합니다.
Overwrite: 지정된 테이블의 데이터를 데이터 세트로 덮어씁니다.
Ignore: 테이블이 이미 있는 경우 쓰기를 건너뛰고 오류가 throw되지 않습니다.
ErrorIfExists: 테이블이 이미 있는 경우 오류를 throw합니다.
HiveStreaming을 사용하여 Hive 테이블에 데이터 세트 쓰기
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource") .option("database", databaseName) .option("table", tableName) .option("metastoreUri", "<HMS_URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save() // To write to static partition df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource") .option("database", databaseName) .option("table", tableName) .option("partition", partition) .option("metastoreUri", "<HMS URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save()
참고 항목
스트림 쓰기는 항상 데이터를 추가합니다.
Hive 테이블에 Spark 스트림 쓰기
stream.writeStream .format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource") .option("metastoreUri", "<HMS_URI>") .option("database", databaseName) .option("table", tableName) //.option("partition", partition) , add if inserting data in partition //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .start()