Apache Spark MLlib で機械学習モデルを構築する
この記事では、Apache Spark MLlib を使用して、Azure のオープン データセットに対するシンプルな予測分析を実行する機械学習アプリケーションの作成方法について説明します。 Spark には、組み込みの機械学習ライブラリが用意されています。 この例では、ロジスティック回帰による分類を使用しています。
コア SparkML と MLlib Spark ライブラリには、機械学習タスクに役立つ多数のユーティリティが用意されています。 これらのユーティリティは、次の用途に適しています。
- 分類
- クラスタリング
- 仮説テストとサンプル統計の計算
- 回帰
- 特異値分解 (SVD) と主成分分析 (PCA)
- トピックのモデリング
分類およびロジスティック回帰について
一般的な Machine Learning タスクである分類は、入力データをカテゴリに分類します。 分類アルゴリズムでは、指定された入力データに ラベル を割り当てる方法を決定しなければなりません。 たとえば、機械学習アルゴリズムを使って、株式情報を入力として受け取り、売却する必要のある株式と保有し続ける必要のある株式の 2 つのカテゴリに株式を分類することができます。
ロジスティック回帰アルゴリズムは、分類に適しています。 Spark のロジスティック回帰 API は、入力データを 2 つのグループのいずれかに分類する二項分類に適しています。 ロジスティック回帰の詳細については、Wikipedia を参照してください。
ロジスティック回帰を行うと、入力ベクトルがどちらか 1 つのグループに属している確率を予測できる ロジスティック関数 が生成されます。
NYC タクシー データの予測分析の例
最初に azureml-opendatasets
をインストールします。 データは Azure Open Datasets リソースから入手できます。 このデータセットのサブセットには、出発と到着の時刻および場所、料金、その他の属性などを含むイエロー タクシー乗車に関する情報がホストされています。
%pip install azureml-opendatasets
この記事の残りの部分では、まず Apache Spark を使用して NYC タクシー乗車のチップ データに対して分析を実行し、次に特定の乗車にチップが含まれるかどうかを予測するモデルを開発します。
Apache Spark 機械学習モデルを作成する
PySpark ノートブックを作成します。 詳細については、「ノートブックの作成」を参照してください。
このノートブックに必要な型をインポートします。
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluator
ここでは、MLflow を使用して、機械学習の実験と対応する実行を追跡します。 Microsoft Fabric の自動ログ記録が有効になっている場合、対応するメトリックとパラメーターが自動的にキャプチャされます。
import mlflow
入力 DataFrame を作成する
この例では、Pandas データフレームにデータを読み込み、それを Apache Spark データフレームに変換します。 この形式では、他の Apache Spark 操作を適用してデータセットをクリーンアップおよびフィルター処理できます。
次の行を新しいセルに貼り付け、これらのコードを実行して Spark DataFrame を作成します。 このステップにより、Open Datasets API を介してデータが取得されます。 このデータをフィルター処理して、特定のデータ ウィンドウを確認できます。 このコード例では、
start_date
とend_date
を使用して、1 か月分のデータを返すフィルターを適用します。from azureml.opendatasets import NycTlcYellow end_date = parser.parse('2018-06-06') start_date = parser.parse('2018-05-01') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) nyc_tlc_pd = nyc_tlc.to_pandas_dataframe() nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
このコードは、データセットを約 10,000 行に減らします。 開発とトレーニングをスピードアップするために、ここではデータセットをサンプリングダウンします。
# To make development easier, faster, and less expensive, sample down for now sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
組み込みの
display()
コマンドを使用してデータを確認します。 このコマンドを実行することで、データのサンプルを簡単に表示したり、データの傾向をグラフィカルに調べたりすることができます。#sampled_taxi_df.show(10) display(sampled_taxi_df.limit(10))
データを準備する
データの準備は、機械学習プロセスの重要なステップです。 生データをクリーニング、変換、整理して、分析とモデリングに適した形にします。 このコード サンプルでは、データ準備のためのいくつかのステップを実行します。
- データセットをフィルター処理して外れ値と正しくない値を削除します
- モデルのトレーニングに必要のない列を削除します
- 生データから新しい列を作成します
- 特定のタクシー乗車にチップがあるかどうかを判断するラベルを生成します
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
その後、データに対して 2 番目のパスを繰り返して、最終的な特徴を追加します。
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
ロジスティック回帰モデルを作成する
最後のタスクでは、ラベル付けされたデータをロジスティック回帰を処理できる形式に変換します。 ロジスティック回帰アルゴリズムへの入力は、"ラベルと特徴ベクトルのペア" 構造である必要があります。ここで "特徴ベクトル" とは、入力ポイントを表す数のベクトルです。
最終的なタスク要件に基づいて、カテゴリ列を数値に変換する必要があります。 具体的には、trafficTimeBins
と weekdayString
列を整数表現に変換する必要があります。 この要件を処理するために使用できるオプションは複数あります。 次の例では、OneHotEncoder
アプローチが使用されています。
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
このアクションによって、すべての列がモデルのトレーニングに適した形式になっている新しい DataFrame が得られます。
ロジスティック回帰モデルのトレーニング
最初のタスクでは、データセットをトレーニング セットとテスト セットまたは検証セットに分割します。
# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
2 つの DataFrame を得たら、モデル式を作成し、トレーニング DataFrame に対してそれを実行する必要があります。 その後に、テスト用 DataFrame に対して検証できます。 さまざまなバージョンのモデル式を試して、さまざまな組み合わせの効果を確認してください。
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
このセルからの出力は以下のとおりです。
Area under ROC = 0.9749430523917996
予測を視覚化する
これで、最終的な視覚化をビルドして、モデルの結果を解釈できるようになりました。 ROC 曲線は確かに結果を提示します。
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
関連するコンテンツ
- AI サンプルを使用して機械学習モデルを構築する: AI サンプルを使用する
- 実験を使用して機械学習の実行を追跡する: 機械学習の実験