使用 Apache Spark MLlib 建置機器學習模型
在本文中,您將了解如何使用 Apache Spark MLlib 建立機器學習應用程式,以處理 Azure 開放資料集上的簡單預測性分析。 Spark 提供內建的機器學習程式庫。 此範例會透過羅吉斯迴歸使用分類。
核心 SparkML 和 MLlib SparkMLlib 程式庫提供許多可用於機器學習工作的工具。 這些公用程式適用於:
- 分類
- 叢集
- 假設測試和計算範例統計資料
- 迴歸
- 奇異值分解 (SVD) 和主體元件分析 (PCA)
- 主題模型化
了解分類和羅吉斯迴歸
分類是常見的機器學習工作,涉及將輸入資料依類別排序。 分類演算法應該了解如何將標籤指派給提供的輸入資料。 例如,機器學習演算法可以接受股票資訊作為輸入,並將股票分成兩個類別:您應該出售的股票,以及應該持有的股票。
羅吉斯迴歸演算法適用於分類。 Spark 的羅吉斯迴歸 API 可用於二元分類,將輸入資料歸類到兩個群組之一。 如需有關羅吉斯迴歸的詳細資訊,請參閱 Wikipedia。
羅吉斯迴歸會產生羅吉斯函式,可預測輸入向量屬於一個群組或其他群組的機率。
紐約市計程車資料的預測性分析範例
首先,安裝 azureml-opendatasets
。 此資料可透過 Azure 開放資料集資源取得。 此資料集子集裝載黃色計程車車程的相關資訊,包括開始時間、結束時間、開始位置、結束位置、車程成本和其他屬性。
%pip install azureml-opendatasets
本文的其餘部分會依賴 Apache Spark 先對紐約市計程車車程小費資料執行一些分析,然後開發模型來預測特定車程是否包含小費。
建立 Apache Spark 機器學習模型
建立 PySpark 筆記本。 如需詳細資訊,請造訪建立筆記本。
匯入此筆記本所需的類型。
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluator
我們將會使用 MLflow 來追蹤機器學習實驗和對應的執行。 如果已啟用 Microsoft Fabric 自動記錄,系統會自動擷取對應的計量和參數。
import mlflow
建構輸入 DataFrame
此範例會將資料載入 Pandas DataFrame ,然後將其轉換成 Apache Spark DataFrame 。 在該格式中,我們可以套用其他 Apache Spark 作業來清理和篩選資料集。
將這些行貼到新的儲存格中,然後執行以建立 Spark DataFrame。 此步驟會透過開放資料集 API 擷取資料。 我們可以篩選此資料,以檢查特定資料視窗。 程式碼範例會使用
start_date
和end_date
來套用傳回單一月份資料的篩選條件。from azureml.opendatasets import NycTlcYellow end_date = parser.parse('2018-06-06') start_date = parser.parse('2018-05-01') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) nyc_tlc_pd = nyc_tlc.to_pandas_dataframe() nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
此程式碼會將資料集減少到大約 10,000 個資料列。 為了加速開發和定型,程式碼會立即向下取樣我們的資料集。
# To make development easier, faster, and less expensive, sample down for now sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
我們想要使用內建
display()
命令來查看資料。 使用此命令,我們可以輕鬆地檢視資料範例,或以圖形方式探索資料的趨勢。#sampled_taxi_df.show(10) display(sampled_taxi_df.limit(10))
準備資料
資料準備是機器學習程序中的重要步驟。 它牽涉到清理、轉換及組織未經處理資料,使其適合分析和模型化。 在此程式碼範例中,您會執行數個資料準備步驟:
- 篩選資料集以移除極端值和不正確的值
- 拿掉模型定型不需要的資料行
- 從未經處理資料建立新的資料行
- 產生標籤,以判斷指定的計程車車程是否牽涉小費
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
接下來,對資料進行第二次傳遞,以新增最終功能。
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
建立羅吉斯迴歸模型
最終工作會將標示的資料轉換成羅吉斯迴歸可以處理的格式。 羅吉斯迴歸演算法的輸入必須具有標籤/特徵向量組結構,其中特徵向量是代表輸入點的數字向量。
根據最終工作需求,我們必須將類別資料行轉換成數位。 具體而言,我們必須將 trafficTimeBins
和 weekdayString
資料行轉換成整數表示法。 許多選項可用來處理這項需求。 此範例牽涉 OneHotEncoder
方法:
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
此動作會產生具有適當格式之所有資料行的新 DataFrame,以定型模型。
訓練羅吉斯迴歸模型
第一個工作會將資料集分割成定型集,以及測試或驗證集。
# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
一旦我們有兩個 DataFrame,必須建立模型公式,並針對定型 DataFrame 予以執行。 然後,我們可以針對測試 DataFrame 進行驗證。 試驗不同版本的模型公式,查看不同組合的效果。
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
儲存格輸出:
Area under ROC = 0.9749430523917996
建立預測的視覺表示法
我們現在可以建置最終視覺效果,以解譯模型結果。 ROC 曲線當然可以呈現結果。
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()