次の方法で共有


コマンドライン ツールを使用して Spark ジョブを送信する

適用対象: SQL Server 2019 (15.x)

この記事では、コマンドライン ツールを使用して SQL Server ビッグ データ クラスターで Spark ジョブを実行する方法についてのガイダンスを提供します。

重要

Microsoft SQL Server 2019 ビッグ データ クラスターのアドオンは廃止されます。 SQL Server 2019 ビッグ データ クラスターのサポートは、2025 年 2 月 28 日に終了します。 ソフトウェア アシュアランス付きの SQL Server 2019 を使用する既存の全ユーザーはプラットフォームで完全にサポートされ、ソフトウェアはその時点まで SQL Server の累積更新プログラムによって引き続きメンテナンスされます。 詳細については、お知らせのブログ記事と「Microsoft SQL Server プラットフォームのビッグ データ オプション」を参照してください。

前提条件

azdata または Livy を使用する Spark ジョブ

この記事では、コマンドライン パターンを使用して Spark アプリケーションを SQL Server ビッグ データ クラスターに送信する方法を例示します。

Azure データ CLI azdata bdc spark コマンドでは、コマンド ラインで SQL Server ビッグ データ クラスター Spark のすべての機能が提供されます。 この記事では、ジョブの送信について重点的に説明します。 ただし、azdata bdc spark では、azdata bdc spark session コマンドを使用して、Python、Scala、SQL、および R の対話モードもサポートされます。

REST API との直接統合が必要な場合は、標準の Livy 呼び出しを使用してジョブを送信します。 この記事では、Livy の例の curl コマンド ライン ツールを使用して、REST API 呼び出しを実行します。 Python コードを使用して Spark Livy エンドポイントとやりとりする方法を示す詳細な例については、GitHub の Livy エンド ポイントからの Spark の使用に関するページを参照してください。

ビッグ データ クラスター Spark を使用するシンプルな ETL

この抽出、変換、読み込み (ETL) アプリケーションは、一般的な Data Engineering パターンに従います。 Apache Hadoop 分散ファイル システム (HDFS) ランディング ゾーン パスから表形式のデータが読み込まれます。 次に、テーブル形式を使用して、HDFS で処理されたゾーン パスへの書き込みが行われます。

サンプル アプリケーションのデータセットをダウンロードします。 次に、PySpark、Spark Scala、または Spark SQL を使用して、PySpark アプリケーションを作成します。

以下のセクションでは、各ソリューションのサンプル演習を示します。 ご利用のプラットフォームのタブを選択します。 azdata または curl を使用して、アプリケーションを実行します。

この例では、次の PySpark アプリケーションが使用されます。 これは、ローカル コンピューター上に parquet_etl_sample.py という名前の Python ファイルとして保存されます。

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Read clickstream_data from storage pool HDFS into a Spark data frame. Applies column renames.
df = spark.read.option("inferSchema", "true").csv('/securelake/landing/criteo/test.txt', sep='\t', 
    header=False).toDF("feat1","feat2","feat3","feat4","feat5","feat6","feat7","feat8",
    "feat9","feat10","feat11","feat12","feat13","catfeat1","catfeat2","catfeat3","catfeat4",
    "catfeat5","catfeat6","catfeat7","catfeat8","catfeat9","catfeat10","catfeat11","catfeat12",
    "catfeat13","catfeat14","catfeat15","catfeat16","catfeat17","catfeat18","catfeat19",
    "catfeat20","catfeat21","catfeat22","catfeat23","catfeat24","catfeat25","catfeat26")

# Print the data frame inferred schema
df.printSchema()

tot_rows = df.count()
print("Number of rows:", tot_rows)

# Drop the managed table
spark.sql("DROP TABLE dl_clickstream")

# Write data frame to HDFS managed table by using optimized Delta Lake table format
df.write.format("parquet").mode("overwrite").saveAsTable("dl_clickstream")

print("Sample ETL pipeline completed")

PySpark アプリケーションを HDFS にコピーする

アプリケーションを HDFS に格納すれば、クラスターからそれにアクセスして実行することができます。 ベスト プラクティスとして、クラスター内のアプリケーションの場所を標準化および管理することで管理を効率化します。

この例のユース ケースでは、すべての ETL パイプライン アプリケーションが hdfs:/apps/ETL-Pipelines パスに格納されます。 サンプル アプリケーションは、hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py に格納されています。

次のコマンドを実行して、ローカルの開発またはステージング コンピューターから HDFS クラスターに parquet_etl_sample.py をアップロードします。

azdata bdc hdfs cp --from-path parquet_etl_sample.py  --to-path "hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py"

Spark アプリケーションを実行する

アプリケーションを SQL Server ビッグ データ クラスター Spark に送信して実行するには、次のコマンドを使用します。

azdata コマンドでアプリケーションを実行する場合は、一般的に指定されるパラメーターを使用します。 azdata bdc spark batch create の完全なパラメーター オプションについては、「azdata bdc spark」を参照してください。

このアプリケーションには、spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation 構成パラメーターが必要です。 そのため、このコマンドでは --config オプションが使用されます。 この設定は、構成を Spark セッションに渡す方法を示しています。

--config オプションを使用すれば、複数の構成パラメーターを指定することができます。 アプリケーション セッション内で、SparkSession オブジェクトの構成を設定することによって、それらを指定することもできます。

azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py \
--config '{"spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m

警告

バッチ名の "name" または "n" パラメーターは、新しいバッチの作成ごとに一意である必要があります。

Spark ジョブを監視する

azdata bdc spark batch コマンドには、Spark バッチ ジョブの管理アクションが用意されています。

実行中のジョブをすべて一覧表示するには、次のコマンドを実行します。

  • azdata コマンドは、次のことを行います。

    azdata bdc spark batch list -o table
    
  • Livy を使用した curl コマンド:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches
    

指定された ID の Spark バッチについて情報を取得するには、次のコマンドを実行します。 spark batch create から batch id が返されます。

  • azdata コマンドは、次のことを行います。

    azdata bdc spark batch info --batch-id 0
    
  • Livy を使用した curl コマンド:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>
    

指定された ID の Spark バッチについて状態情報を取得するには、次のコマンドを実行します。

  • azdata コマンドは、次のことを行います。

    azdata bdc spark batch state --batch-id 0
    
  • Livy を使用した curl コマンド:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/state
    

指定された ID の Spark バッチについてログを取得するには、次のコマンドを実行します。

  • azdata コマンドは、次のことを行います。

    azdata bdc spark batch log --batch-id 0
    
  • Livy を使用した curl コマンド:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/log
    

次のステップ

Spark コードのトラブルシューティングの詳細については、「PySpark ノートブックのトラブルシューティング」を参照してください。

包括的な Spark サンプル コードは、GitHub の SQL Server ビッグ データ クラスター Spark のサンプルに関するページで入手できます。

SQL Server ビッグ データ クラスターおよびこれに関連するシナリオの詳細については、「SQL Server ビッグ データ クラスター」を参照してください。