Azure HDInsight 中 Hive Warehouse Connector 支援的 Apache Spark 作業
本文說明 Hive Warehouse Connector (HWC) 支援的 Spark 型作業。 顯示的所有範例都會透過 Apache Spark 殼層執行。
先決條件
完成 Hive Warehouse Connector 設定步驟。
開始使用
若要啟動 spark-shell 工作階段,請執行下列步驟:
使用 ssh 命令來連線到 Apache Spark 叢集。 將 CLUSTERNAME 取代為您的叢集名稱以編輯命令,然後輸入命令:
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
從 ssh 工作階段執行下列命令,以記下
hive-warehouse-connector-assembly
版本:ls /usr/hdp/current/hive_warehouse_connector
使用上述版本編輯程序代碼
hive-warehouse-connector-assembly
。 然後執行命令以啟動 Spark 殼層:spark-shell --master yarn \ --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \ --conf spark.security.credentials.hiveserver2.enabled=false
啟動 spark 殼層之後,可以使用下列命令啟動 Hive Warehouse Connector 實例:
import com.hortonworks.hwc.HiveWarehouseSession val hive = HiveWarehouseSession.session(spark).build()
使用 Hive 查詢建立 Spark 資料框架
使用 HWC 程式庫的所有查詢結果都會以資料框架的形式傳回。 下列範例示範如何建立基本 Hive 查詢。
hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()
查詢結果是 Spark 資料框架,可以與像是 MLIB 和 SparkSQL 的 Spark 程式庫搭配使用。
將 Spark 資料框架寫出至 Hive 資料表
Spark 原生不支援寫入 Hive 的受控 ACID 資料表。 不過,使用 HWC 時,您可以將任何資料框架寫出至 Hive 資料表。 您可以在下列範例中看到這項功能運作:
建立名為
sampletable_colorado
的資料表,並使用下列命令來指定其資料行:hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
篩選資料表
hivesampletable
,其中資料行state
等於Colorado
。 此 Hive 查詢會使用write
函式,並傳回儲存在 Hive 資料表sampletable_colorado
中的 Spark DataFrame 與結果。hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
使用下列命令來檢視結果:
hive.table("sampletable_colorado").show()
結構化串流寫入
使用 Hive Warehouse Connector,您也可以使用 Spark 串流將資料寫入至 Hive 資料表。
重要
在已啟用 ESP 的 Spark 4.0 叢集中,不支援結構化串流寫入。
請遵循步驟,透過localhost埠9999將Spark資料流中的數據內嵌至Hive資料表。 Hive Warehouse Connector。
從您的開啟 Spark 殼層,使用下列命令開始 Spark 串流:
val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
執行下列步驟,為您建立的 Spark 串流產生資料:
- 在相同的 Spark 叢集中開啟第二個 SSH 工作階段。
- 在命令提示字元中,輸入
nc -lk 9999
。 此命令會使用netcat
公用程式,從命令列將資料傳送至指定的連接埠。
返回第一個 SSH 工作階段,並建立新的 Hive 資料表來保存串流資料。 在 spark-shell 中,輸入下列命令:
hive.createTable("stream_table").column("value","string").create()
然後使用下列命令,將串流資料寫入至新建立的資料表:
lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
重要
因為 Apache Spark 中有已知問題,所以
metastoreUri
和database
選項目前必須手動設定。 如需有關此問題的詳細資訊,請參閱 SPARK-25460。返回第二個 SSH 工作階段,然後輸入下列值:
foo HiveSpark bar
返回第一個 SSH 工作階段,並記下簡短的活動。 請使用下列命令以檢視資料:
hive.table("stream_table").show()
使用 Ctrl + C,在第二個 SSH 工作階段上停止 netcat
。 使用 :q
,在第一個 SSH 工作階段上結束 spark-shell。