Dela via


Skapa en maskininlärningsmodell med Apache Spark MLlib

I den här artikeln får du lära dig hur du använder Apache Spark MLlib för att skapa ett maskininlärningsprogram som hanterar enkel förutsägelseanalys på en öppen Azure-datauppsättning. Spark tillhandahåller inbyggda maskininlärningsbibliotek. I det här exemplet används klassificering via logistisk regression.

De grundläggande SparkML- och MLlib Spark-biblioteken tillhandahåller många verktyg som är användbara för maskininlärningsuppgifter. Dessa verktyg är lämpliga för:

  • Klassificering
  • Klustring
  • Hypotestestning och beräkning av exempelstatistik
  • Regression
  • Singulär värde nedbrytning (SVD) och huvudkomponentanalys (PCA)
  • Ämnesmodellering

Förstå klassificering och logistisk regression

Klassificering, en populär maskininlärningsuppgift, omfattar sortering av indata i kategorier. En klassificeringsalgoritm bör ta reda på hur du tilldelar etiketter till angivna indata. En maskininlärningsalgoritm kan till exempel acceptera aktieinformation som indata och dela upp aktien i två kategorier: aktier som du bör sälja och aktier som du bör behålla.

Algoritmen för logistisk regression är användbar för klassificering. Api:et för logistisk regression i Spark är användbart för binär klassificering av indata i en av två grupper. Mer information om logistisk regression finns i Wikipedia.

Logistisk regression skapar en logistisk funktion som kan förutsäga sannolikheten för att en indatavektor tillhör den ena gruppen eller den andra.

Exempel på förutsägelseanalys av NYC-taxidata

azureml-opendatasetsInstallera först . Data är tillgängliga via Azure Open Datasets-resursen . Den här delmängden innehåller information om gula taxiresor, inklusive starttider, sluttider, startplatser, slutplatser, resekostnader och andra attribut.

%pip install azureml-opendatasets

Resten av den här artikeln förlitar sig på att Apache Spark först utför en analys av nyc taxi-trip-tipsdata och sedan utvecklar en modell för att förutsäga om en viss resa innehåller ett tips eller inte.

Skapa en Apache Spark-maskininlärningsmodell

  1. Skapa en PySpark-anteckningsbok. Mer information finns i Skapa en anteckningsbok.

  2. Importera de typer som krävs för den här notebook-filen.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Vi använder MLflow för att spåra våra maskininlärningsexperiment och motsvarande körningar. Om Microsoft Fabric Autologging är aktiverat registreras motsvarande mått och parametrar automatiskt.

    import mlflow
    

Konstruera indataramen

Det här exemplet läser in data i en Pandas-dataram och konverterar dem sedan till en Apache Spark-dataram. I det formatet kan vi använda andra Apache Spark-åtgärder för att rensa och filtrera datamängden.

  1. Klistra in dessa rader i en ny cell och kör dem för att skapa en Spark DataFrame. Det här steget hämtar data via API:et Öppna datauppsättningar. Vi kan filtrera ned dessa data för att undersöka ett specifikt datafönster. Kodexemplet använder start_date och end_date för att tillämpa ett filter som returnerar en enda månad med data.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Den här koden minskar datamängden till cirka 10 000 rader. För att påskynda utvecklingen och träningen tar kodexemplen ned vår datauppsättning för tillfället.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Vi vill titta på våra data med hjälp av det inbyggda display() kommandot. Med det här kommandot kan vi enkelt visa ett dataexempel eller grafiskt utforska trender i data.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Förbereda data

Dataförberedelse är ett viktigt steg i maskininlärningsprocessen. Det handlar om att rensa, transformera och organisera rådata för att göra dem lämpliga för analys och modellering. I det här kodexemplet utför du flera dataförberedelsesteg:

  • Filtrera datamängden för att ta bort avvikande värden och felaktiga värden
  • Ta bort kolumner som inte behövs för modellträning
  • Skapa nya kolumner från rådata
  • Generera en etikett för att avgöra om en viss taxiresa innehåller ett tips eller inte
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Gör sedan en andra pass over data för att lägga till de sista funktionerna.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Skapa en logistisk regressionsmodell

Den sista uppgiften konverterar etiketterade data till ett format som logistisk regression kan hantera. Indata till en logistisk regressionsalgoritm måste ha en struktur för etikett-/funktionsvektorpar, där funktionsvektorn är en vektor med tal som representerar indatapunkten.

Baserat på de slutliga uppgiftskraven måste vi konvertera de kategoriska kolumnerna till tal. Mer specifikt måste vi konvertera kolumnerna trafficTimeBins och weekdayString till heltalsrepresentationer. Vi har många tillgängliga alternativ för att hantera det här kravet. Det här exemplet omfattar OneHotEncoder metoden:

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Den här åtgärden resulterar i en ny DataFrame med alla kolumner i rätt format för att träna en modell.

Träna en logistisk regressionsmodell

Den första uppgiften delar upp datamängden i en träningsuppsättning och en test- eller valideringsuppsättning.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

När vi har två DataFrames måste vi skapa modellformeln och köra den mot träningsdataramen. Sedan kan vi verifiera mot testdataRamen. Experimentera med olika versioner av modellformeln för att se effekterna av olika kombinationer.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Cellen utdata:

Area under ROC = 0.9749430523917996

Skapa en visuell representation av förutsägelsen

Nu kan vi skapa en slutlig visualisering för att tolka modellresultaten. En ROC-kurva kan säkert presentera resultatet.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Diagram som visar ROC-kurvan för logistisk regression i tipsmodellen.