使用 Apache Spark MLlib 生成机器学习模型
本文介绍如何使用 Apache Spark MLlib 创建机器学习应用程序,该应用程序对 Azure 开放数据集处理简单的预测分析。 Spark 提供内置机器学习库。 此示例通过逻辑回归使用分类。
核心的 SparkML 和 MLlib Spark 库提供许多可用于机器学习任务的实用工具。 这些实用工具适用于以下任务:
- 分类
- 群集
- 假设测试和计算示例统计信息
- 回归
- 单值分解 (SVD) 和主体组件分析 (PCA)
- 主题建模
了解分类和逻辑回归
分类是一种常见的机器学习任务,涉及将输入数据按类别排序。 分类算法应确定如何将标签分配给提供的输入数据。 例如,机器学习算法可以将股票信息作为输入并将股票分为两个类别:应该卖出的股票和应该保留的股票。
逻辑回归算法对于分类非常实用。 Spark 逻辑回归 API 对于二元分类非常实用,即将输入数据归类到两组中的一组。 有关逻辑回归的详细信息,请参阅维基百科。
逻辑回归会产生一个逻辑函数,该函数可预测输入向量属于其中一个组或另一个组的概率。
纽约市出租车数据预测分析示例
首先,安装 azureml-opendatasets
。 数据通过 Azure 开放数据集资源提供。 该数据集的子集包含有关黄色出租车行程的信息,包括开始时间、结束时间、开始地点、结束地点、行程费用和其他属性。
%pip install azureml-opendatasets
本文的其余部分需要 Apache Spark 首先对纽约市出租车行程小费数据执行一些分析,然后开发一个模型来预测特定行程是否包含小费。
创建 Apache Spark 机器学习模型
创建一个 PySpark 笔记本。 有关详细信息,请访问创建笔记本。
导入此笔记本所需的类型。
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluator
我们将使用 MLflow 跟踪机器学习试验和相应的运行。 如果启用了 Microsoft Fabric 自动日志记录,则会自动捕获相应的指标和参数。
import mlflow
构造输入数据帧
此示例会将数据加载到 Pandas 数据帧中,然后将其转换为 Apache Spark 数据帧。 使用该格式,我们可以应用其他 Apache Spark 操作来清理和筛选数据集。
将这些行粘贴到一个新单元格中,然后运行它们以创建 Spark 数据帧。 此步骤会通过开放数据集 API 检索数据。 我们可以进一步筛选此数据,以检查特定的数据窗口。 此代码示例使用
start_date
和end_date
来应用一个会返回单个月份数据的筛选器。from azureml.opendatasets import NycTlcYellow end_date = parser.parse('2018-06-06') start_date = parser.parse('2018-05-01') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) nyc_tlc_pd = nyc_tlc.to_pandas_dataframe() nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
此代码会将数据集减少到大约 10,000 行。 为了加快开发和训练,我们暂时对数据集进行了采样。
# To make development easier, faster, and less expensive, sample down for now sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
我们希望使用内置
display()
命令检查一下数据。 借助该命令,我们能够轻松查看数据样本或以图形方式探索数据的趋势。#sampled_taxi_df.show(10) display(sampled_taxi_df.limit(10))
准备数据
数据准备是机器学习过程中的关键一步。 它涉及清理、转换和组织原始数据,使其适合分析和建模。 以下代码示例中将执行多个数据准备步骤:
- 筛选数据集以移除离群值和错误值
- 移除模型训练不需要的列
- 从原始数据创建新列
- 生成标签以确定给定的出租车行程是否涉及小费
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
然后对数据进行第二次传递以添加最终特征。
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
创建逻辑回归模型
最后一项任务是将标签数据转换为逻辑回归可处理的格式。 逻辑回归算法的输入必须采用标签/特征矢量对结构,其中特征矢量是表示输入点的数字矢量。
根据最终任务要求,必须将分类列转换为数字。 具体而言,必须将 trafficTimeBins
和 weekdayString
列转换为整数表示形式。 有许多选项可用于满足此要求。 此示例涉及 OneHotEncoder
方法:
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
此操作会生成一个新的数据帧(其中所有列都采用正确的格式)来训练模型。
训练逻辑回归模型
第一个任务会将数据集拆分为训练集、测试集或验证集。
# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
有了两个数据帧后,必须创建模型公式,并针对训练数据帧运行该公式。 然后,可针对测试数据帧进行验证。 试验不同版本的模型公式,以了解不同组合的影响。
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
单元格输出:
Area under ROC = 0.9749430523917996
创建预测的直观表示形式
现在可以生成最终可视化效果来解释模型结果。 ROC 曲线当然可以显示结果。
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()