Del via


Bygg en maskinlæringsmodell med Apache Spark MLlib

I denne artikkelen lærer du hvordan du bruker Apache Spark MLlib til å opprette et maskinlæringsprogram som håndterer enkel prediktiv analyse på et Azure-åpent datasett. Spark gir innebygde maskinlæringsbiblioteker. Dette eksemplet bruker klassifisering gjennom logistisk regresjon.

Kjernebibliotekene SparkML og MLlib Spark gir mange verktøy som er nyttige for maskinlæringsoppgaver. Disse verktøyene er egnet for:

  • Klassifisering
  • Klynging
  • Hypotesetesting og beregning av eksempelstatistikk
  • Regresjon
  • Entallsverdikomponering (SVD) og hovedkomponentanalyse (PCA)
  • Emnemodellering

Forstå klassifisering og logistisk regresjon

Klassifisering, en populær maskinlæringsoppgave, innebærer sortering av inndata i kategorier. En klassifiseringsalgoritme bør finne ut hvordan du tilordner etiketter til de angitte inndata. En maskinlæringsalgoritme kan for eksempel godta aksjeinformasjon som inndata, og dele aksjen inn i to kategorier: aksjer som du bør selge og aksjer som du bør beholde.

Algoritmen for logistikkregresjon er nyttig for klassifisering. Spark-logistikkregresjons-API-en er nyttig for binær klassifisering av inndata i én av to grupper. Hvis du vil ha mer informasjon om logistisk regresjon, kan du se Wikipedia.

Logistisk regresjon produserer en logistisk funksjon som kan forutsi sannsynligheten for at en inndatavektor tilhører én gruppe eller den andre.

Eksempel på prediktiv analyse av NYC-taxidata

Først må du installere azureml-opendatasets. Dataene er tilgjengelige via Azure Open Datasets-ressursen. Dette delsettet for datasettet inneholder informasjon om gule taxiturer, inkludert starttider, sluttidspunkt, startsteder, sluttsteder, reisekostnader og andre attributter.

%pip install azureml-opendatasets

Resten av denne artikkelen er avhengig av Apache Spark for først å utføre noen analyser på NYC taxi-trip tips data og deretter utvikle en modell for å forutsi om en bestemt tur inkluderer et tips eller ikke.

Opprette en Apache Spark-maskinlæringsmodell

  1. Opprett en PySpark-notatblokk. Hvis du vil ha mer informasjon, kan du gå til Opprett en notatblokk.

  2. Importer typene som kreves for denne notatblokken.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Vi bruker MLflow til å spore maskinlæringseksperimentene og tilsvarende kjøringer. Hvis Microsoft Fabric Autologging er aktivert, registreres de tilsvarende måledataene og parameterne automatisk.

    import mlflow
    

Konstruere datarammen for inndata

Dette eksemplet laster inn dataene i en Pandas-dataramme, og konverterer dem deretter til en Apache Spark-dataramme. I dette formatet kan vi bruke andre Apache Spark-operasjoner for å rense og filtrere datasettet.

  1. Lim inn disse linjene i en ny celle, og kjør dem for å opprette en Spark DataFrame. Dette trinnet henter dataene via API-en For åpne datasett. Vi kan filtrere disse dataene ned for å undersøke et bestemt datavindu. Kodeeksempelet bruker start_date og end_date bruker et filter som returnerer én enkelt måned med data.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Denne koden reduserer datasettet til ca. 10 000 rader. For å øke hastigheten på utvikling og opplæring, prøver koden ned datasettet for øyeblikket.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Vi ønsker å se på dataene våre ved hjelp av den innebygde display() kommandoen. Med denne kommandoen kan vi enkelt vise et dataeksempel eller utforske trender i dataene grafisk.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Klargjøre dataene

Dataforberedelse er et viktig trinn i maskinlæringsprosessen. Det innebærer rengjøring, transformasjon og organisering av rådata, for å gjøre det egnet for analyse og modellering. I dette kodeeksempelet utfører du flere trinn for klargjøring av data:

  • Filtrer datasettet for å fjerne ytterpunkter og uriktige verdier
  • Fjerne kolonner som ikke er nødvendige for modellopplæring
  • Opprette nye kolonner fra rådataene
  • Generer en etikett for å finne ut om en gitt taxitur innebærer et tips
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Deretter foretar du en ny overføring over dataene for å legge til de endelige funksjonene.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Opprette en logistisk regresjonsmodell

Den endelige oppgaven konverterer de merkede dataene til et format som logistikkregresjon kan håndtere. Inndataene til en logistisk regresjonsalgoritme må ha en struktur for etikett/funksjonsvektorpar, der funksjonsvektoren er en vektor av tall som representerer inndatapunktet.

Basert på de endelige oppgavekravene må vi konvertere de kategoriske kolonnene til tall. Spesielt må vi konvertere trafficTimeBins kolonnene og weekdayString kolonnene til heltallsrepresentasjoner. Vi har mange tilgjengelige alternativer for å håndtere dette kravet. Dette eksemplet OneHotEncoder omfatter fremgangsmåten:

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Denne handlingen resulterer i en ny DataFrame med alle kolonner i riktig format for å lære opp en modell.

Lære opp en logistisk regresjonsmodell

Den første oppgaven deler datasettet inn i et opplæringssett, og et test- eller valideringssett.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Når vi har to DataFrames, må vi opprette modellformelen og kjøre den mot opplæringsdatarammen. Deretter kan vi validere mot testdatarammen. Eksperimenter med ulike versjoner av modellformelen for å se effektene av ulike kombinasjoner.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Celleutdataene:

Area under ROC = 0.9749430523917996

Opprette en visuell representasjon av prognosen

Vi kan nå bygge en endelig visualisering for å tolke modellresultatene. En ROC-kurve kan sikkert presentere resultatet.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graf som viser ROC-kurven for logistisk regresjon i tipsmodellen.