Skapa en maskininlärningsmodell med Apache Spark MLlib
I den här artikeln får du lära dig hur du använder Apache Spark MLlib för att skapa ett maskininlärningsprogram som hanterar enkel förutsägelseanalys på en öppen Azure-datauppsättning. Spark tillhandahåller inbyggda maskininlärningsbibliotek. I det här exemplet används klassificering via logistisk regression.
De grundläggande SparkML- och MLlib Spark-biblioteken tillhandahåller många verktyg som är användbara för maskininlärningsuppgifter. Dessa verktyg är lämpliga för:
- Klassificering
- Klustring
- Hypotestestning och beräkning av exempelstatistik
- Regression
- Singulär värde nedbrytning (SVD) och huvudkomponentanalys (PCA)
- Ämnesmodellering
Förstå klassificering och logistisk regression
Klassificering, en populär maskininlärningsuppgift, omfattar sortering av indata i kategorier. En klassificeringsalgoritm bör ta reda på hur du tilldelar etiketter till angivna indata. En maskininlärningsalgoritm kan till exempel acceptera aktieinformation som indata och dela upp aktien i två kategorier: aktier som du bör sälja och aktier som du bör behålla.
Algoritmen för logistisk regression är användbar för klassificering. Api:et för logistisk regression i Spark är användbart för binär klassificering av indata i en av två grupper. Mer information om logistisk regression finns i Wikipedia.
Logistisk regression skapar en logistisk funktion som kan förutsäga sannolikheten för att en indatavektor tillhör den ena gruppen eller den andra.
Exempel på förutsägelseanalys av NYC-taxidata
azureml-opendatasets
Installera först . Data är tillgängliga via Azure Open Datasets-resursen . Den här delmängden innehåller information om gula taxiresor, inklusive starttider, sluttider, startplatser, slutplatser, resekostnader och andra attribut.
%pip install azureml-opendatasets
Resten av den här artikeln förlitar sig på att Apache Spark först utför en analys av nyc taxi-trip-tipsdata och sedan utvecklar en modell för att förutsäga om en viss resa innehåller ett tips eller inte.
Skapa en Apache Spark-maskininlärningsmodell
Skapa en PySpark-anteckningsbok. Mer information finns i Skapa en anteckningsbok.
Importera de typer som krävs för den här notebook-filen.
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluator
Vi använder MLflow för att spåra våra maskininlärningsexperiment och motsvarande körningar. Om Microsoft Fabric Autologging är aktiverat registreras motsvarande mått och parametrar automatiskt.
import mlflow
Konstruera indataramen
Det här exemplet läser in data i en Pandas-dataram och konverterar dem sedan till en Apache Spark-dataram. I det formatet kan vi använda andra Apache Spark-åtgärder för att rensa och filtrera datamängden.
Klistra in dessa rader i en ny cell och kör dem för att skapa en Spark DataFrame. Det här steget hämtar data via API:et Öppna datauppsättningar. Vi kan filtrera ned dessa data för att undersöka ett specifikt datafönster. Kodexemplet använder
start_date
ochend_date
för att tillämpa ett filter som returnerar en enda månad med data.from azureml.opendatasets import NycTlcYellow end_date = parser.parse('2018-06-06') start_date = parser.parse('2018-05-01') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) nyc_tlc_pd = nyc_tlc.to_pandas_dataframe() nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
Den här koden minskar datamängden till cirka 10 000 rader. För att påskynda utvecklingen och träningen tar kodexemplen ned vår datauppsättning för tillfället.
# To make development easier, faster, and less expensive, sample down for now sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
Vi vill titta på våra data med hjälp av det inbyggda
display()
kommandot. Med det här kommandot kan vi enkelt visa ett dataexempel eller grafiskt utforska trender i data.#sampled_taxi_df.show(10) display(sampled_taxi_df.limit(10))
Förbereda data
Dataförberedelse är ett viktigt steg i maskininlärningsprocessen. Det handlar om att rensa, transformera och organisera rådata för att göra dem lämpliga för analys och modellering. I det här kodexemplet utför du flera dataförberedelsesteg:
- Filtrera datamängden för att ta bort avvikande värden och felaktiga värden
- Ta bort kolumner som inte behövs för modellträning
- Skapa nya kolumner från rådata
- Generera en etikett för att avgöra om en viss taxiresa innehåller ett tips eller inte
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Gör sedan en andra pass over data för att lägga till de sista funktionerna.
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Skapa en logistisk regressionsmodell
Den sista uppgiften konverterar etiketterade data till ett format som logistisk regression kan hantera. Indata till en logistisk regressionsalgoritm måste ha en struktur för etikett-/funktionsvektorpar, där funktionsvektorn är en vektor med tal som representerar indatapunkten.
Baserat på de slutliga uppgiftskraven måste vi konvertera de kategoriska kolumnerna till tal. Mer specifikt måste vi konvertera kolumnerna trafficTimeBins
och weekdayString
till heltalsrepresentationer. Vi har många tillgängliga alternativ för att hantera det här kravet. Det här exemplet omfattar OneHotEncoder
metoden:
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Den här åtgärden resulterar i en ny DataFrame med alla kolumner i rätt format för att träna en modell.
Träna en logistisk regressionsmodell
Den första uppgiften delar upp datamängden i en träningsuppsättning och en test- eller valideringsuppsättning.
# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
När vi har två DataFrames måste vi skapa modellformeln och köra den mot träningsdataramen. Sedan kan vi verifiera mot testdataRamen. Experimentera med olika versioner av modellformeln för att se effekterna av olika kombinationer.
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
Cellen utdata:
Area under ROC = 0.9749430523917996
Skapa en visuell representation av förutsägelsen
Nu kan vi skapa en slutlig visualisering för att tolka modellresultaten. En ROC-kurva kan säkert presentera resultatet.
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
Relaterat innehåll
- Använda AI-exempel för att skapa maskininlärningsmodeller: Använda AI-exempel
- Spåra maskininlärningskörningar med experiment: Maskininlärningsexperiment