コマンドライン ツールを使用して Spark ジョブを送信する
適用対象: SQL Server 2019 (15.x)
この記事では、コマンドライン ツールを使用して SQL Server ビッグ データ クラスターで Spark ジョブを実行する方法についてのガイダンスを提供します。
重要
Microsoft SQL Server 2019 ビッグ データ クラスターのアドオンは廃止されます。 SQL Server 2019 ビッグ データ クラスターのサポートは、2025 年 2 月 28 日に終了します。 ソフトウェア アシュアランス付きの SQL Server 2019 を使用する既存の全ユーザーはプラットフォームで完全にサポートされ、ソフトウェアはその時点まで SQL Server の累積更新プログラムによって引き続きメンテナンスされます。 詳細については、お知らせのブログ記事と「Microsoft SQL Server プラットフォームのビッグ データ オプション」を参照してください。
前提条件
- SQL Server 2019 ビッグ データ ツール 構成され、クラスターにログインします。
azdata
- Livy への REST API 呼び出しを実行するための
curl
アプリケーション
azdata または Livy を使用する Spark ジョブ
この記事では、コマンドライン パターンを使用して Spark アプリケーションを SQL Server ビッグ データ クラスターに送信する方法を例示します。
Azure データ CLI azdata bdc spark
コマンドでは、コマンド ラインで SQL Server ビッグ データ クラスター Spark のすべての機能が提供されます。 この記事では、ジョブの送信について重点的に説明します。 ただし、azdata bdc spark
では、azdata bdc spark session
コマンドを使用して、Python、Scala、SQL、および R の対話モードもサポートされます。
REST API との直接統合が必要な場合は、標準の Livy 呼び出しを使用してジョブを送信します。 この記事では、Livy の例の curl
コマンド ライン ツールを使用して、REST API 呼び出しを実行します。 Python コードを使用して Spark Livy エンドポイントとやりとりする方法を示す詳細な例については、GitHub の Livy エンド ポイントからの Spark の使用に関するページを参照してください。
ビッグ データ クラスター Spark を使用するシンプルな ETL
この抽出、変換、読み込み (ETL) アプリケーションは、一般的な Data Engineering パターンに従います。 Apache Hadoop 分散ファイル システム (HDFS) ランディング ゾーン パスから表形式のデータが読み込まれます。 次に、テーブル形式を使用して、HDFS で処理されたゾーン パスへの書き込みが行われます。
サンプル アプリケーションのデータセットをダウンロードします。 次に、PySpark、Spark Scala、または Spark SQL を使用して、PySpark アプリケーションを作成します。
以下のセクションでは、各ソリューションのサンプル演習を示します。 ご利用のプラットフォームのタブを選択します。 azdata
または curl
を使用して、アプリケーションを実行します。
この例では、次の PySpark アプリケーションが使用されます。 これは、ローカル コンピューター上に parquet_etl_sample.py という名前の Python ファイルとして保存されます。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Read clickstream_data from storage pool HDFS into a Spark data frame. Applies column renames.
df = spark.read.option("inferSchema", "true").csv('/securelake/landing/criteo/test.txt', sep='\t',
header=False).toDF("feat1","feat2","feat3","feat4","feat5","feat6","feat7","feat8",
"feat9","feat10","feat11","feat12","feat13","catfeat1","catfeat2","catfeat3","catfeat4",
"catfeat5","catfeat6","catfeat7","catfeat8","catfeat9","catfeat10","catfeat11","catfeat12",
"catfeat13","catfeat14","catfeat15","catfeat16","catfeat17","catfeat18","catfeat19",
"catfeat20","catfeat21","catfeat22","catfeat23","catfeat24","catfeat25","catfeat26")
# Print the data frame inferred schema
df.printSchema()
tot_rows = df.count()
print("Number of rows:", tot_rows)
# Drop the managed table
spark.sql("DROP TABLE dl_clickstream")
# Write data frame to HDFS managed table by using optimized Delta Lake table format
df.write.format("parquet").mode("overwrite").saveAsTable("dl_clickstream")
print("Sample ETL pipeline completed")
PySpark アプリケーションを HDFS にコピーする
アプリケーションを HDFS に格納すれば、クラスターからそれにアクセスして実行することができます。 ベスト プラクティスとして、クラスター内のアプリケーションの場所を標準化および管理することで管理を効率化します。
この例のユース ケースでは、すべての ETL パイプライン アプリケーションが hdfs:/apps/ETL-Pipelines パスに格納されます。 サンプル アプリケーションは、hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py に格納されています。
次のコマンドを実行して、ローカルの開発またはステージング コンピューターから HDFS クラスターに parquet_etl_sample.py をアップロードします。
azdata bdc hdfs cp --from-path parquet_etl_sample.py --to-path "hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py"
Spark アプリケーションを実行する
アプリケーションを SQL Server ビッグ データ クラスター Spark に送信して実行するには、次のコマンドを使用します。
azdata
コマンドでアプリケーションを実行する場合は、一般的に指定されるパラメーターを使用します。 azdata bdc spark batch create
の完全なパラメーター オプションについては、「azdata bdc spark
」を参照してください。
このアプリケーションには、spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation
構成パラメーターが必要です。 そのため、このコマンドでは --config
オプションが使用されます。 この設定は、構成を Spark セッションに渡す方法を示しています。
--config
オプションを使用すれば、複数の構成パラメーターを指定することができます。 アプリケーション セッション内で、SparkSession
オブジェクトの構成を設定することによって、それらを指定することもできます。
azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py \
--config '{"spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m
警告
バッチ名の "name" または "n" パラメーターは、新しいバッチの作成ごとに一意である必要があります。
Spark ジョブを監視する
azdata bdc spark batch
コマンドには、Spark バッチ ジョブの管理アクションが用意されています。
実行中のジョブをすべて一覧表示するには、次のコマンドを実行します。
azdata
コマンドは、次のことを行います。azdata bdc spark batch list -o table
Livy を使用した
curl
コマンド:curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches
指定された ID の Spark バッチについて情報を取得するには、次のコマンドを実行します。 spark batch create
から batch id
が返されます。
azdata
コマンドは、次のことを行います。azdata bdc spark batch info --batch-id 0
Livy を使用した
curl
コマンド:curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>
指定された ID の Spark バッチについて状態情報を取得するには、次のコマンドを実行します。
azdata
コマンドは、次のことを行います。azdata bdc spark batch state --batch-id 0
Livy を使用した
curl
コマンド:curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/state
指定された ID の Spark バッチについてログを取得するには、次のコマンドを実行します。
azdata
コマンドは、次のことを行います。azdata bdc spark batch log --batch-id 0
Livy を使用した
curl
コマンド:curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/log
次のステップ
Spark コードのトラブルシューティングの詳細については、「PySpark ノートブックのトラブルシューティング」を参照してください。
包括的な Spark サンプル コードは、GitHub の SQL Server ビッグ データ クラスター Spark のサンプルに関するページで入手できます。
SQL Server ビッグ データ クラスターおよびこれに関連するシナリオの詳細については、「SQL Server ビッグ データ クラスター」を参照してください。