Apache Spark プールを使用したデータ ラングリング (非推奨)
適用対象: Python SDK azureml v1
警告
Python SDK v1 で利用できる Azure Machine Learning との Azure Synapse Analytics の統合は非推奨になっています。 ユーザーは、Azure Machine Learning に登録された Synapse ワークスペースをリンク サービスとして引き続き使用できます。 ただし、新しい Synapse ワークスペースは、リンクされたサービスとして Azure Machine Learning に登録できなくなります。 CLI v2 と Python SDK v2 で利用できるサーバーレス Spark コンピューティングと、アタッチされた Synapse Spark プールの使用をお勧めします。 詳細については、https://aka.ms/aml-spark を参照してください。
この記事では、Azure Synapse Analytics を利用して、Jupyter ノートブック内の専用 Synapse セッション内で、データ ラングリング タスクを対話的に実行する方法を学習します。 これらのタスクは Azure Machine Learning Python SDK を利用します。 Azure Machine Learning パイプラインの詳細については、「機械学習パイプライン内で (Azure Synapse Analytics を利用する) Apache Spark を使用する方法 (プレビュー)」を参照してください。 Synapse ワークスペースで Azure Synapse Analytics を使用する方法の詳細については、「Azure Synapse Analytics の概要シリーズ」を参照してください。
Azure Machine Learning と Azure Synapse Analytics の統合
Azure Synapse Analytics の Azure Machine Learning との統合 (プレビュー) を使用すると、対話的なデータ探索とデータ準備のための (Azure Synapse を利用する) Apache Spark プールをアタッチすることができます。 この統合により、機械学習モデルのトレーニングに使用する Python ノートブック内からそのすべてを利用することができる、大規模なデータ ラングリング用の専用のコンピューティング リソースを確保できます。
前提条件
開発環境を構成して Azure Machine Learning SDK をインストールするか、SDK が既にインストールされている Azure Machine Learning コンピューティング インスタンスを使用します
Azure Machine Learning Python SDK をインストールします。
Azure portal、Web ツール、または Synapse Studio を使用して Apache Spark プールを作成する
次のコードを使用して、
azureml-synapse
パッケージ (プレビュー) をインストールする。pip install azureml-synapse
Azure Machine Learning Python SDK または Azure Machine Learning スタジオを使用して Azure Machine Learning ワークスペースと Azure Synapse Analytics ワークスペースをリンクする
コンピューティング先として Synapse Spark プールをアタッチする
データ ラングリング タスク用に Synapse Spark プールを起動する
Apache Spark プールを使用したデータ準備を開始するには、アタッチされた Spark Synapse のコンピューティング名を指定します。 この名前は、Azure Machine Learning スタジオの [アタッチされたコンピューティング] タブで確認できます。
重要
引き続き Apache Spark プールを使用するためには、データ ラングリング タスクでどのコンピューティング リソースを使用するのかを指定する必要があります。 単一コード行には %synapse
を使用し、複数行には %%synapse
を使用します。
%synapse start -c SynapseSparkPoolAlias
セッションの開始後、以下のようにセッションのメタデータを確認できます。
%synapse meta
Apache Spark セッション中に使用する Azure Machine Learning 環境を指定できます。 この環境に指定された Conda 依存関係のみが有効になります。 Docker イメージはサポートされていません。
警告
環境 Conda 依存関係に指定された Python 依存関係は、Apache Spark プールではサポートされません。 現時点では、固定の Python バージョンのみがサポートされています。Python のバージョンを確認するにはスクリプトに sys.version_info
を含めてください
次のコードは、環境変数 myenv
を作成し、セッションの開始前に azureml-core
のバージョン 1.20.0 および numpy
のバージョン 1.17.0 をインストールします。 その後、Apache Spark セッションの start
ステートメントにこの環境を含めることができます。
from azureml.core import Workspace, Environment
# creates environment with numpy and azureml-core dependencies
ws = Workspace.from_config()
env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core==1.20.0")
env.python.conda_dependencies.add_conda_package("numpy==1.17.0")
env.register(workspace=ws)
カスタム環境内の Apache Spark プールを使用したデータ準備を開始するには、Apache Spark プール名と Apache Spark セッション中に使用する環境を指定します。 サブスクリプション ID、機械学習ワークスペースのリソース グループ、機械学習ワークスペースの名前を指定することができます。
%synapse start -c SynapseSparkPoolAlias -e myenv -s AzureMLworkspaceSubscriptionID -r AzureMLworkspaceResourceGroupName -w AzureMLworkspaceName
ストレージからデータを読み込む
Apache Spark セッションが開始されたら、準備したいデータを読み取ります。 データの読み込みは、Azure Blob Storage および Azure Data Lake Storage Generation 1 および 2 でサポートされます。
これらのストレージ サービスからデータを読み込むには、以下の 2 つの選択肢があります。
Hadoop 分散ファイル システム (HDFS) パスを使用して、ストレージから直接データを読み込む
既存の Azure Machine Learning データセットからデータを読み取る
これらのストレージ サービスにアクセスするには、ストレージ BLOB データ閲覧者のアクセス許可が必要です。 これらのストレージ サービスにデータを書き戻すには、ストレージ BLOB データ共同作成者アクセス許可が必要となります。 詳細については、ストレージのアクセス許可とロールに関するページを参照してください。
Hadoop Distributed Files System (HDFS) パスを使用してデータを読み込む
対応する HDFS パスを使用してストレージからデータの読み込みと読み取りを行うには、データ アクセス認証の資格情報が利用できる必要があります。 これらの資格情報は、ストレージの種類によって異なります。 次のコード サンプルは、Shared Access Signature (SAS) トークンまたはアクセス キーを使用して、Azure Blob Storage から Spark データフレームにデータを読み取る方法を示しています。
%%synapse
# setup access key or SAS token
sc._jsc.hadoopConfiguration().set("fs.azure.account.key.<storage account name>.blob.core.windows.net", "<access key>")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.<container name>.<storage account name>.blob.core.windows.net", "<sas token>")
# read from blob
df = spark.read.option("header", "true").csv("wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv")
次のコード サンプルは、サービス プリンシパル資格情報を使用して、Azure Data Lake Storage Generation 1 (ADLS Gen 1) からデータを読み取る方法を示しています。
%%synapse
# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.access.token.provider.type","ClientCredential")
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.client.id", "<client id>")
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.credential", "<client secret>")
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.refresh.url",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")
df = spark.read.csv("adl://<storage account name>.azuredatalakestore.net/<path>")
次のコード サンプルは、サービス プリンシパル資格情報を使用して、Azure Data Lake Storage Generation 2 (ADLS Gen 2) からデータを読み取る方法を示しています。
%%synapse
# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<storage account name>.dfs.core.windows.net","OAuth")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth.provider.type.<storage account name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.id.<storage account name>.dfs.core.windows.net", "<client id>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.secret.<storage account name>.dfs.core.windows.net", "<client secret>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.endpoint.<storage account name>.dfs.core.windows.net",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")
df = spark.read.csv("abfss://<container name>@<storage account>.dfs.core.windows.net/<path>")
登録済みデータセットからデータを読み取る
Spark データフレームに変換するのであれば、既存の登録済みデータセットをワークスペースに配置し、その上でデータ準備を実行することもできます。 次の例では、ワークスペースに対して認証を行い、BLOB ストレージ内のファイルを参照する登録済みの TabularDataset (blob_dset
) を取得し、それを Spark データフレームに変換します。 データセットを Spark データフレームに変換する際は、pyspark
のデータ探索ライブラリとデータ準備ライブラリを使用できます。
%%synapse
from azureml.core import Workspace, Dataset
subscription_id = "<enter your subscription ID>"
resource_group = "<enter your resource group>"
workspace_name = "<enter your workspace name>"
ws = Workspace(workspace_name = workspace_name,
subscription_id = subscription_id,
resource_group = resource_group)
dset = Dataset.get_by_name(ws, "blob_dset")
spark_df = dset.to_spark_dataframe()
データ ラングリング タスクを実行する
データを取得して探索した後、データ ラングリング タスクを実行することができます。 次のコード サンプルは、前のセクションの HDFS の例を拡張したものです。 これは、Survivor 列に基づいて、Spark データフレーム df
内のデータのフィルター処理を行い、そのリストを Age によってグループ化します。
%%synapse
from pyspark.sql.functions import col, desc
df.filter(col('Survived') == 1).groupBy('Age').count().orderBy(desc('count')).show(10)
df.show()
データをストレージに保存して Spark セッションを停止する
データ探索とデータ準備が完了したら、後で使用できるよう準備済みのデータを Azure 上のストレージ アカウントに保存します。 次のコード サンプルでは、準備済みのデータを Azure Blob Storage に書き戻し、training_data
ディレクトリにある元の Titanic.csv
ファイルを上書きします。 ストレージへの書き戻しを行うには、ストレージ BLOB データ共同作成者のアクセス許可が必要となります。 詳細については、「BLOB データにアクセスするための Azure ロールの割り当て」を参照してください。
%% synapse
df.write.format("csv").mode("overwrite").save("wasbs://demo@dprepdata.blob.core.windows.net/training_data/Titanic.csv")
データ準備が完了し、準備済みのデータをストレージに保存したら、次のコマンドを使用して Apache Spark プールの使用を終了します。
%synapse stop
準備済みのデータを表すデータセットを作成する
準備済みのデータをモデル トレーニングに使用する準備ができたら、Azure Machine Learning データストアを使用してストレージに接続し、Azure Machine Learning データセットで使用したいファイルを指定します。
次のコード例は
- 準備済みのデータを保存したストレージ サービスに接続するデータストアが作成済みであることを前提とします
- get() メソッドを使用して、既存のデータストア (
mydatastore
) をワークスペースws
から取得します。 mydatastore
training_data
ディレクトリ内にある準備済みのデータ ファイルを参照する FileDataset (train_ds
) を作成します- 変数
input1
を作成します。 後ほど、この変数を使用して、トレーニング タスクのためにコンピューティング先が利用できるtrain_ds
データセットのデータ ファイルを作成します。
from azureml.core import Datastore, Dataset
datastore = Datastore.get(ws, datastore_name='mydatastore')
datastore_paths = [(datastore, '/training_data/')]
train_ds = Dataset.File.from_files(path=datastore_paths, validate=True)
input1 = train_ds.as_mount()
ScriptRunConfig
を使用して Synapse Spark プールに実験の実行を送信する
データ ラングリング タスクを自動化して作成する準備ができたら、ScriptRunConfig オブジェクトを使用して、アタッチした Synapse Spark プールに実験の実行を送信できます。 同様に、Azure Machine Learning パイプラインがある場合は、パイプラインでのデータ準備ステップのために、コンピューティング先として Synapse Spark プールを指定するための SynapseSparkStep を使用することができます。 Synapse Spark プールでデータを利用できるかどうかはデータセットの種類で決まります。
- FileDataset の場合は、
as_hdfs()
メソッドを使用できます。 実行が送信されると、データセットは Hadoop 分散ファイル システム (HFDS) として Synapse Spark プールで利用できるようになります - TabularDataset に対しては、
as_named_input()
メソッドを使用できます
次のコード サンプルは
- 前のコード例で作成した FileDataset
train_ds
から変数input2
を作成します HDFSOutputDatasetConfiguration
クラスを使用して変数output
を作成します。 実行が完了すると、このクラスによって、実行の出力をデータセットtest
としてデータストアmydatastore
内に保存できるようになります。 Azure Machine Learning ワークスペースでは、test
データセットはregistered_dataset
という名前で登録されます- Synapse Spark プールでの実行のために、実行で使用するべき設定を構成します
- ScriptRunConfig パラメーターを以下のように定義します
- 実行に
dataprep.py
スクリプトを使用する - 入力として使用するデータと、そのデータを Synapse Spark プールで利用できるようにする方法を指定する
- 出力データ
output
を保存する場所を指定する
- 実行に
from azureml.core import Dataset, HDFSOutputDatasetConfig
from azureml.core.environment import CondaDependencies
from azureml.core import RunConfiguration
from azureml.core import ScriptRunConfig
from azureml.core import Experiment
input2 = train_ds.as_hdfs()
output = HDFSOutputDatasetConfig(destination=(datastore, "test").register_on_complete(name="registered_dataset")
run_config = RunConfiguration(framework="pyspark")
run_config.target = synapse_compute_name
run_config.spark.configuration["spark.driver.memory"] = "1g"
run_config.spark.configuration["spark.driver.cores"] = 2
run_config.spark.configuration["spark.executor.memory"] = "1g"
run_config.spark.configuration["spark.executor.cores"] = 1
run_config.spark.configuration["spark.executor.instances"] = 1
conda_dep = CondaDependencies()
conda_dep.add_pip_package("azureml-core==1.20.0")
run_config.environment.python.conda_dependencies = conda_dep
script_run_config = ScriptRunConfig(source_directory = './code',
script= 'dataprep.py',
arguments = ["--file_input", input2,
"--output_dir", output],
run_config = run_config)
run_config.spark.configuration
と一般的な Spark 構成に関する詳細は、「SparkConfiguration クラス」と「Apache Spark の構成ドキュメント」を参照してください。
ScriptRunConfig
オブジェクトの設定が完了したら、実行を送信できます。
from azureml.core import Experiment
exp = Experiment(workspace=ws, name="synapse-spark")
run = exp.submit(config=script_run_config)
run
この例で使用する dataprep.py
スクリプトに関する情報を含め、詳細については、ノートブック例を参照してください。
データの準備が完了すると、データをトレーニング ジョブの入力として使用できるようになります。 前述のコード例では、トレーニング ジョブの入力データとして registered_dataset
を指定しています。
サンプルの Notebook
より詳しい概念や Azure Synapse Analytics と Azure Machine Learning の統合機能については、以下のノートブック例を参照してください。
- Azure Machine Learning ワークスペース内の Notebook から対話型の Spark セッションを実行します。
- コンピューティング先として Synapse Spark プールを使用して Azure Machine Learning 実験の実行を送信します。