Apache Spark を使用した Apache HBase データの読み取り/書き込み
通常、Apache HBase は、低レベルの API (スキャン、取得、および配置) または Apache Phoenix を使用した SQL 構文のいずれかでクエリされます。 Apache には、Apache Spark HBase コネクタも用意されています。 そのコネクタは、HBase によって格納されたデータに対してクエリと変更を行うための便利で効率的な代替手段です。
前提条件
2 つの異なる HDInsight クラスターが、同じ仮想ネットワークにデプロイされていること。 一方は HBase で、もう一方は Spark 2.1 (HDInsight 3.6) 以降がインストールされた Spark です。 詳細については、「Azure Portal を使用した HDInsight の Linux ベースのクラスターの作成」をご覧ください。
クラスターのプライマリ ストレージの URI スキーム。 このスキーマは、Azure Blob Storage では wasb://、Azure Data Lake Storage Gen2 では
abfs://
、Azure Data Lake Storage Gen1 では adl:// になります。 Blob Storage で安全な転送が有効になっている場合、URI はwasbs://
になります。 安全な転送に関するページも参照してください。
全体的なプロセス
Spark クラスターが HBase クラスターのクエリを実行できるようにするためのプロセスの概要は次のとおりです。
- HBase でいくつかのサンプル データを準備します。
- HBase クラスター構成フォルダー (/etc/hbase/conf) から hbase-site.xml ファイルを取得し、hbase-site.xml のコピーを Spark 2 構成フォルダー (/etc/spark2/conf) に入れます。 (省略可能: このプロセスを自動化するには、HDInsight チームによって提供されるスクリプトを使用します)
packages
オプションで、Maven 座標を使用して Spark HBase コネクターを参照するspark-shell
を実行します。- Spark のスキーマを HBase にマップするカタログを定義します。
- RDD または DataFrame API を使用して HBase データと対話します。
Apache HBase でサンプル データを準備する
この手順では、Spark を使用してクエリできるテーブルを Apache HBase に作成し、データを設定します。
ssh
コマンドを使用して HBase クラスターに接続します。HBASECLUSTER
を実際の HBase クラスターの名前に置き換えてコマンドを編集し、そのコマンドを入力します。ssh sshuser@HBASECLUSTER-ssh.azurehdinsight.net
hbase shell
コマンドを使用して、HBase 対話型シェルを起動します。 SSH 接続で次のコマンドを入力します。hbase shell
create
コマンドを使用して、2 つの列ファミリを持つ HBase テーブルを作成します。 次のコマンドを入力します。create 'Contacts', 'Personal', 'Office'
put
コマンドを使用して、特定のテーブルの指定行の指定列に値を挿入します。 次のコマンドを入力します。put 'Contacts', '1000', 'Personal:Name', 'John Dole' put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001' put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002' put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.' put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji' put 'Contacts', '8396', 'Personal:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Phone', '230-555-0191' put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'
exit
コマンドを使用して、HBase 対話型シェルを停止します。 次のコマンドを入力します。exit
スクリプトを実行してクラスター間の接続を設定する
クラスター間の通信を設定するには、手順に従って、クラスターで 2 つのスクリプトを実行します。 これらのスクリプトは、「通信を手動で設定する」セクションで説明するファイルのコピーのプロセスを自動化します。
- HBase クラスターから実行するスクリプトは、Spark クラスターに接続されている既定のストレージに
hbase-site.xml
と HBase IP マッピング情報をアップロードします。 - Spark クラスターから実行するスクリプトは、2 つのヘルパー スクリプトを定期的に実行する 2 つの cron ジョブを設定します。
- HBase cron ジョブ – Spark の既定のストレージ アカウントからローカル ノードに新しい
hbase-site.xml
ファイルと HBase IP マッピングをダウンロードします - Spark cron ジョブ – Spark のスケーリングが発生したかどうかと、クラスターがセキュリティで保護されているかどうかを検査します。 そうである場合は、
/etc/hosts
を編集して、ローカルに格納されている HBase IP マッピングを含めます。
- HBase cron ジョブ – Spark の既定のストレージ アカウントからローカル ノードに新しい
注: 続行する前に、Spark クラスターのストレージ アカウントをセカンダリ ストレージ アカウントとして HBase クラスターに追加したことを確認してください。 スクリプトは示されている順番になるようにしてください。
HBase クラスターでスクリプト アクションを使用して、次の点を考慮して変更を適用します。
プロパティ 値 Bash スクリプト URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-hbase.sh
ノードの種類 リージョン パラメーター -s SECONDARYS_STORAGE_URL -d "DOMAIN_NAME
Persisted はい SECONDARYS_STORAGE_URL
は、Spark 側の既定ストレージの URL です。 パラメーターの例:-s wasb://sparkcon-2020-08-03t18-17-37-853z@sparkconhdistorage.blob.core.windows.net -d "securehadooprc"
Spark クラスターでスクリプト アクションを使用して、次の点を考慮して変更を適用します。
プロパティ 値 Bash スクリプト URI https://hdiconfigactions2.blob.core.windows.net/hbasesparkconnect/connector-spark.sh
ノードの種類 ヘッド、ワーカー、Zookeeper パラメーター -s "SPARK-CRON-SCHEDULE" (optional) -h "HBASE-CRON-SCHEDULE" (optional) -d "DOMAIN_NAME" (mandatory)
Persisted はい - このクラスターで更新プログラムを自動的にチェックする頻度を指定できます。 既定値: -s “*/1 * * * *” -h 0 (この例では、Spark cron は毎分実行されますが、HBase cron は実行されません)
- HBase cron は既定では設定されないため、HBase クラスターへのスケーリングを実行するときに、このスクリプトを再実行する必要があります。 HBase クラスターが頻繁にスケーリングされる場合は、HBase cron ジョブを自動的に設定することを選択できます。 例:
-s '*/1 * * * *' -h '*/30 * * * *' -d "securehadooprc"
は、30 分ごとにチェックを実行するようにスクリプトを構成します。 これにより、HBase cron スケジュールが定期的に実行され、共通ストレージ アカウントの新しい HBase 情報がローカル ノードに自動的にダウンロードされます。
Note
これらのスクリプトは、HDI 5.0 および HDI 5.1 クラスターでのみ機能します。
通信を手動で設定する (オプション、上記の手順で指定したスクリプトが失敗した場合)
注: これらの手順は、クラスターの 1 つでスケーリング アクティビティが行われるたびに実行する必要があります。
ローカル ストレージから Spark クラスターの既定ストレージのルートに hbase-site.xml をコピーします。 コマンドを編集して構成を反映します。 次に、開いている HBase クラスターへの SSH セッションから、以下のコマンドを入力します。
構文の値 新しい値 URI スキーム ストレージを反映するように変更します。 この構文は、安全な転送が有効になった Blob ストレージ用です。 SPARK_STORAGE_CONTAINER
Spark クラスターで使用される既定のストレージ コンテナー名に置き換えます。 SPARK_STORAGE_ACCOUNT
Spark クラスターで使用される既定のストレージ アカウント名に置き換えます。 hdfs dfs -copyFromLocal /etc/hbase/conf/hbase-site.xml wasbs://SPARK_STORAGE_CONTAINER@SPARK_STORAGE_ACCOUNT.blob.core.windows.net/
次に、HBase クラスターへの SSH 接続を終了します。
exit
SSH を使用して、Spark クラスターのヘッド ノードに接続します。
SPARKCLUSTER
を Spark クラスターの名前に置き換えてコマンドを編集し、そのコマンドを入力します。ssh sshuser@SPARKCLUSTER-ssh.azurehdinsight.net
次のコマンドを入力して、Spark クラスターの既定のストレージから、クラスターのローカル ストレージの Spark 2 構成フォルダーに
hbase-site.xml
をコピーします。sudo hdfs dfs -copyToLocal /hbase-site.xml /etc/spark2/conf
Spark HBase コネクターを参照する Spark Shell を実行する
前の手順を完了すると、適切なバージョンの Spark HBase コネクタを参照して、Spark シェルを実行できるようになります。
例として、次の表では、HDInsight チームが現在使用している 2 つのバージョンと対応するコマンドを示します。 表で示されているように HBase と Spark のバージョンが同じ場合は、クラスターに同じバージョンを使用できます。
開いている Spark クラスターへの SSH セッションで、下のコマンドを入力して、Spark シェルを開始します。
Spark のバージョン HDI HBase のバージョン SHC のバージョン コマンド 2.1 HDI 3.6 (HBase 1.1) 1.1.1-2.1-s_2.11 spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories https://repo.hortonworks.com/content/groups/public/
この Spark シェル インスタンスを開いたままで、カタログとクエリを定義します。 SHC コア リポジトリ内のバージョンに対応する jar が見つからない場合は、読み進めてください。
以降の Spark および HBase のバージョンの組み合わせの場合、これらの成果物が上のリポジトリで公開されなくなります。 jar は、spark-hbase-connector GitHub ブランチから直接ビルドできます。 たとえば、Spark 2.4 と HBase 2.1 を使用して実行する場合は、次の手順を完了します。
リポジトリを複製します。
git clone https://github.com/hortonworks-spark/shc
ブランチ - 2.4 にアクセスします。
git checkout branch-2.4
ブランチからビルドします (.jar ファイルを作成します)。
mvn clean package -DskipTests
次のコマンドを実行します (作成した .jar ファイルに対応するよう .jar の名前を変更してください)。
spark-shell --jars <path to your jar>,/usr/hdp/current/hbase-client/lib/shaded-clients/*
この Spark シェル インスタンスを開いたままで、次のセクションに進みます。
カタログとクエリを定義する
この手順では、Apache Spark のスキーマを Apache HBase にマップするカタログ オブジェクトを定義します。
開いている Spark シェルで、次の
import
ステートメントを入力します。import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.datasources.hbase._ import org.apache.spark.{SparkConf, SparkContext} import spark.sqlContext.implicits._
以下のコマンドを入力して、HBase で作成した Contacts テーブルのカタログを定義します。
def catalog = s"""{ |"table":{"namespace":"default", "name":"Contacts"}, |"rowkey":"key", |"columns":{ |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"}, |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"}, |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"}, |"personalName":{"cf":"Personal", "col":"Name", "type":"string"}, |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"} |} |}""".stripMargin
コード:
Contacts
という名前の HBase テーブルのカタログ スキーマを定義します。- rowkey を
key
として指定し、Spark で使用されている列名を、HBase で使用されている列ファミリ、列名、列タイプにマップします。 rowkey
の特定の列ファミリcf
を持つ名前付きの列 (rowkey
) として、rowkey を詳細に定義します。
次のコマンドを入力して、HBase で
Contacts
テーブルの DataFrame を提供するメソッドを定義します。def withCatalog(cat: String): DataFrame = { spark.sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load() }
DataFrame のインスタンスを作成します。
val df = withCatalog(catalog)
DataFrame のクエリを実行します。
df.show()
2 行のデータが表示されます。
+------+--------------------+--------------+-------------+--------------+ |rowkey| officeAddress| officePhone| personalName| personalPhone| +------+--------------------+--------------+-------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+-------------+--------------+
一時テーブルを登録して、Spark SQL を使用して HBase テーブルのクエリを実行できるようにします。
df.createTempView("contacts")
contacts
テーブルに対して SQL クエリを発行します。spark.sqlContext.sql("select personalName, officeAddress from contacts").show
次のような結果が表示されます。
+-------------+--------------------+ | personalName| officeAddress| +-------------+--------------------+ | John Dole|1111 San Gabriel Dr.| | Calvin Raji|5415 San Gabriel Dr.| +-------------+--------------------+
新しい行を挿入する
Contact の新しいレコードを挿入するには、
ContactRecord
クラスを定義します。case class ContactRecord( rowkey: String, officeAddress: String, officePhone: String, personalName: String, personalPhone: String )
ContactRecord
のインスタンスを作成し、配列に挿入します。val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194") var newData = new Array[ContactRecord](1) newData(0) = newContact
新しいデータの配列を HBase に保存します。
sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
結果を確認します。
df.show()
次のような出力結果が表示されます。
+------+--------------------+--------------+------------+--------------+ |rowkey| officeAddress| officePhone|personalName| personalPhone| +------+--------------------+--------------+------------+--------------+ | 1000|1111 San Gabriel Dr.|1-425-000-0002| John Dole|1-425-000-0001| | 16891| 40 Ellis St.| 674-555-0110|John Jackson| 230-555-0194| | 8396|5415 San Gabriel Dr.| 230-555-0191| Calvin Raji| 230-555-0191| +------+--------------------+--------------+------------+--------------+
次のコマンドを入力して、Spark シェルを閉じます。
:q