Azure HDInsight の Hive Warehouse Connector でサポートされる Apache Spark の操作
この記事では、Hive Warehouse Connector (HWC) でサポートされる Spark ベースの操作について説明します。 示されるすべての例は、Apache Spark シェルを通じて実行されます。
前提条件
Hive Warehouse Connector の設定ステップを完了します。
作業の開始
spark-shell セッションを開始するには、次の手順を実行します。
ssh コマンドを使用して Apache Spark クラスターに接続します。 CLUSTERNAME をクラスターの名前に置き換えてコマンドを編集してから、そのコマンドを入力します。
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
SSH セッションから次のコマンドを実行して、
hive-warehouse-connector-assembly
のバージョンを確認します。ls /usr/hdp/current/hive_warehouse_connector
上で特定した
hive-warehouse-connector-assembly
のバージョンを使用して、コードを編集します。 次のコマンドを実行して、spark シェルを起動します。spark-shell --master yarn \ --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \ --conf spark.security.credentials.hiveserver2.enabled=false
spark-shell を開始した後、次のコマンドを使用して Hive Warehouse Connector インスタンスを開始できます。
import com.hortonworks.hwc.HiveWarehouseSession val hive = HiveWarehouseSession.session(spark).build()
Hive クエリを使用した Spark DataFrame の作成
HWC ライブラリを使用したすべてのクエリの結果は、DataFrame として返されます。 次の例では、基本的な Hive クエリを作成する方法を示します。
hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()
クエリの結果は Spark DataFrame です。これは、MLIB や SparkSQL のような Spark ライブラリと共に使用できます。
Spark DataFrame から Hive テーブルへの書き出し
Spark では、Hive によって管理される ACID テーブルへの書き込みはネイティブにサポートされていません。 ただし、HWC を使用すると、どの DataFrame も Hive テーブルに書き出すことができます。 次の例では、この機能の動作を確認できます。
次のコマンドを使用し、
sampletable_colorado
というテーブルを作成してその列を指定します。hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
テーブル
hivesampletable
をフィルターします (列state
=Colorado
)。 この Hive クエリからは Spark DataFrame が返され、結果はwrite
関数を使用して Hive テーブルsampletable_colorado
に保存されます。hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
次のコマンドを使用して結果を表示します。
hive.table("sampletable_colorado").show()
構造化ストリーミングの書き込み
Hive Warehouse Connector を使用すると、Spark ストリーミングを使って Hive テーブルにデータを書き込むことができます。
重要
構造化ストリーミングの書き込みは、ESP が有効になっている Spark 4.0 クラスターではサポートされていません。
手順に従って、localhost ポート 9999 の Spark ストリームから Hive テーブルにデータを取り込みます。 (Hive Warehouse Connector 経由)、次の手順のようにします。
開いている Spark シェルから、次のコマンドを使用して Spark ストリームを開始します。
val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
次の手順を実行して、作成した Spark ストリームのためのデータを生成します。
- 同じ Spark クラスターで2番目の SSH セッションを開きます。
- コマンド プロンプトで、「
nc -lk 9999
」と入力します。 このコマンドでは、netcat
ユーティリティを使用して、コマンド ラインから指定のポートにデータを送信します。
最初の SSH セッションに戻り、ストリーミング データを保持する新しい Hive テーブルを作成します。 spark-shell で、次のコマンドを入力します。
hive.createTable("stream_table").column("value","string").create()
その後、次のコマンドを使用して、新しく作成したテーブルにストリーミング データを書き込みます。
lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
重要
現在、
metastoreUri
およびdatabase
オプションは、Apache Spark の既知の問題のため、手動で設定する必要があります。 この問題の詳細については、SPARK-25460 を参照してください。2番目の SSH セッションに戻り、次の値を入力します。
foo HiveSpark bar
最初の SSH セッションに戻り、この簡単なアクティビティに注目します。 次のコマンドを使用して、データを表示します。
hive.table("stream_table").show()
2 番目の SSH セッションで、Ctrl + C を使用して netcat
を停止します。 最初の SSH セッションで、:q
を使用して spark-shell を終了します。