Bygg en maskinlæringsmodell med Apache Spark MLlib
I denne artikkelen lærer du hvordan du bruker Apache Spark MLlib til å opprette et maskinlæringsprogram som håndterer enkel prediktiv analyse på et Azure-åpent datasett. Spark gir innebygde maskinlæringsbiblioteker. Dette eksemplet bruker klassifisering gjennom logistisk regresjon.
Kjernebibliotekene SparkML og MLlib Spark gir mange verktøy som er nyttige for maskinlæringsoppgaver. Disse verktøyene er egnet for:
- Klassifisering
- Klynging
- Hypotesetesting og beregning av eksempelstatistikk
- Regresjon
- Entallsverdikomponering (SVD) og hovedkomponentanalyse (PCA)
- Emnemodellering
Forstå klassifisering og logistisk regresjon
Klassifisering, en populær maskinlæringsoppgave, innebærer sortering av inndata i kategorier. En klassifiseringsalgoritme bør finne ut hvordan du tilordner etiketter til de angitte inndata. En maskinlæringsalgoritme kan for eksempel godta aksjeinformasjon som inndata, og dele aksjen inn i to kategorier: aksjer som du bør selge og aksjer som du bør beholde.
Algoritmen for logistikkregresjon er nyttig for klassifisering. Spark-logistikkregresjons-API-en er nyttig for binær klassifisering av inndata i én av to grupper. Hvis du vil ha mer informasjon om logistisk regresjon, kan du se Wikipedia.
Logistisk regresjon produserer en logistisk funksjon som kan forutsi sannsynligheten for at en inndatavektor tilhører én gruppe eller den andre.
Eksempel på prediktiv analyse av NYC-taxidata
Først må du installere azureml-opendatasets
. Dataene er tilgjengelige via Azure Open Datasets-ressursen. Dette delsettet for datasettet inneholder informasjon om gule taxiturer, inkludert starttider, sluttidspunkt, startsteder, sluttsteder, reisekostnader og andre attributter.
%pip install azureml-opendatasets
Resten av denne artikkelen er avhengig av Apache Spark for først å utføre noen analyser på NYC taxi-trip tips data og deretter utvikle en modell for å forutsi om en bestemt tur inkluderer et tips eller ikke.
Opprette en Apache Spark-maskinlæringsmodell
Opprett en PySpark-notatblokk. Hvis du vil ha mer informasjon, kan du gå til Opprett en notatblokk.
Importer typene som kreves for denne notatblokken.
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluator
Vi bruker MLflow til å spore maskinlæringseksperimentene og tilsvarende kjøringer. Hvis Microsoft Fabric Autologging er aktivert, registreres de tilsvarende måledataene og parameterne automatisk.
import mlflow
Konstruere datarammen for inndata
Dette eksemplet laster inn dataene i en Pandas-dataramme, og konverterer dem deretter til en Apache Spark-dataramme. I dette formatet kan vi bruke andre Apache Spark-operasjoner for å rense og filtrere datasettet.
Lim inn disse linjene i en ny celle, og kjør dem for å opprette en Spark DataFrame. Dette trinnet henter dataene via API-en For åpne datasett. Vi kan filtrere disse dataene ned for å undersøke et bestemt datavindu. Kodeeksempelet bruker
start_date
ogend_date
bruker et filter som returnerer én enkelt måned med data.from azureml.opendatasets import NycTlcYellow end_date = parser.parse('2018-06-06') start_date = parser.parse('2018-05-01') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) nyc_tlc_pd = nyc_tlc.to_pandas_dataframe() nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
Denne koden reduserer datasettet til ca. 10 000 rader. For å øke hastigheten på utvikling og opplæring, prøver koden ned datasettet for øyeblikket.
# To make development easier, faster, and less expensive, sample down for now sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
Vi ønsker å se på dataene våre ved hjelp av den innebygde
display()
kommandoen. Med denne kommandoen kan vi enkelt vise et dataeksempel eller utforske trender i dataene grafisk.#sampled_taxi_df.show(10) display(sampled_taxi_df.limit(10))
Klargjøre dataene
Dataforberedelse er et viktig trinn i maskinlæringsprosessen. Det innebærer rengjøring, transformasjon og organisering av rådata, for å gjøre det egnet for analyse og modellering. I dette kodeeksempelet utfører du flere trinn for klargjøring av data:
- Filtrer datasettet for å fjerne ytterpunkter og uriktige verdier
- Fjerne kolonner som ikke er nødvendige for modellopplæring
- Opprette nye kolonner fra rådataene
- Generer en etikett for å finne ut om en gitt taxitur innebærer et tips
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Deretter foretar du en ny overføring over dataene for å legge til de endelige funksjonene.
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Opprette en logistisk regresjonsmodell
Den endelige oppgaven konverterer de merkede dataene til et format som logistikkregresjon kan håndtere. Inndataene til en logistisk regresjonsalgoritme må ha en struktur for etikett/funksjonsvektorpar, der funksjonsvektoren er en vektor av tall som representerer inndatapunktet.
Basert på de endelige oppgavekravene må vi konvertere de kategoriske kolonnene til tall. Spesielt må vi konvertere trafficTimeBins
kolonnene og weekdayString
kolonnene til heltallsrepresentasjoner. Vi har mange tilgjengelige alternativer for å håndtere dette kravet. Dette eksemplet OneHotEncoder
omfatter fremgangsmåten:
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Denne handlingen resulterer i en ny DataFrame med alle kolonner i riktig format for å lære opp en modell.
Lære opp en logistisk regresjonsmodell
Den første oppgaven deler datasettet inn i et opplæringssett, og et test- eller valideringssett.
# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Når vi har to DataFrames, må vi opprette modellformelen og kjøre den mot opplæringsdatarammen. Deretter kan vi validere mot testdatarammen. Eksperimenter med ulike versjoner av modellformelen for å se effektene av ulike kombinasjoner.
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
Celleutdataene:
Area under ROC = 0.9749430523917996
Opprette en visuell representasjon av prognosen
Vi kan nå bygge en endelig visualisering for å tolke modellresultatene. En ROC-kurve kan sikkert presentere resultatet.
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
Relatert innhold
- Bruk AI-eksempler til å bygge maskinlæringsmodeller: Bruk AI-eksempler
- Spor maskinlæringskjøringer ved hjelp av eksperimenter: Maskinlæringseksperimenter