Een machine learning-model bouwen met Apache Spark MLlib
In dit artikel leert u hoe u Apache Spark MLlib gebruikt om een machine learning-toepassing te maken waarmee eenvoudige voorspellende analyses in een open Azure-gegevensset worden verwerkt. Spark biedt ingebouwde machine learning-bibliotheken. In dit voorbeeld wordt gebruikgemaakt van classificatie via logistieke regressie.
De sparkML- en MLlib Spark-kernbibliotheken bieden veel hulpprogramma's die nuttig zijn voor machine learning-taken. Deze hulpprogramma's zijn geschikt voor:
- Classificatie
- Clustering
- Hypothesen voor het testen en berekenen van voorbeeldstatistieken
- Regressie
- SVD (Singular Value Decomposition) en PCA (Principal Component Snalysis)
- Modellering van onderwerpen
Classificatie en logistieke regressie begrijpen
Classificatie, een populaire machine learning-taak, omvat het sorteren van invoergegevens in categorieën. Een classificatie-algoritme moet bepalen hoe labels moeten worden toegewezen aan de opgegeven invoergegevens. Een machine learning-algoritme kan bijvoorbeeld aandelengegevens accepteren als invoer en het aandeel in twee categorieën verdelen: aandelen die u moet verkopen en aandelen die u moet behouden.
Het logistieke regressie-algoritme is handig voor classificatie. De Logistieke Regressie-API van Spark is handig voor binaire classificatie van invoergegevens in een van twee groepen. Zie Wikipedia voor meer informatie over logistieke regressie.
Logistieke regressie produceert een logistieke functie die de kans kan voorspellen dat een invoervector deel uitmaakt van de ene groep of de andere.
Voorbeeld van voorspellende analyse van NYC-taxigegevens
azureml-opendatasets
Installeer eerst . De gegevens zijn beschikbaar via de Azure Open Datasets-resource . Deze gegevensset bevat informatie over gele taxiritten, waaronder de begintijden, eindtijden, beginlocaties, eindlocaties, reiskosten en andere kenmerken.
%pip install azureml-opendatasets
De rest van dit artikel is afhankelijk van Apache Spark om eerst een analyse uit te voeren op de gegevens van de tip voor taxiritten in NYC en vervolgens een model te ontwikkelen om te voorspellen of een bepaalde reis een tip bevat of niet.
Een machine learning-model in Apache Spark maken
Maak een PySpark-notebook. Ga naar Een notitieblok maken voor meer informatie.
Importeer de typen die vereist zijn voor dit notebook.
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluator
We gebruiken MLflow om onze machine learning-experimenten en bijbehorende uitvoeringen bij te houden. Als Automatische aanmelding van Microsoft Fabric is ingeschakeld, worden de bijbehorende metrische gegevens en parameters automatisch vastgelegd.
import mlflow
Het dataframe voor invoer maken
In dit voorbeeld worden de gegevens in een Pandas-dataframe geladen en vervolgens geconverteerd naar een Apache Spark-gegevensframe. In deze indeling kunnen we andere Apache Spark-bewerkingen toepassen om de gegevensset op te schonen en te filteren.
Plak deze regels in een nieuwe cel en voer ze uit om een Spark DataFrame te maken. Met deze stap worden de gegevens opgehaald via de Open Datasets-API. We kunnen deze gegevens filteren om een specifiek venster met gegevens te onderzoeken. In het codevoorbeeld wordt
start_date
een filter gebruikt enend_date
toegepast dat één maand aan gegevens retourneert.from azureml.opendatasets import NycTlcYellow end_date = parser.parse('2018-06-06') start_date = parser.parse('2018-05-01') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) nyc_tlc_pd = nyc_tlc.to_pandas_dataframe() nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
Deze code vermindert de gegevensset tot ongeveer 10.000 rijen. Om de ontwikkeling en training te versnellen, worden de codevoorbeelden voorlopig in onze gegevensset opgeslagen.
# To make development easier, faster, and less expensive, sample down for now sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
We willen onze gegevens bekijken met behulp van de ingebouwde
display()
opdracht. Met deze opdracht kunnen we eenvoudig een gegevensvoorbeeld bekijken of trends in de gegevens grafisch verkennen.#sampled_taxi_df.show(10) display(sampled_taxi_df.limit(10))
De gegevens voorbereiden
Gegevensvoorbereiding is een cruciale stap in het machine learning-proces. Het omvat het opschonen, transformeren en organiseren van onbewerkte gegevens, om deze geschikt te maken voor analyse en modellering. In dit codevoorbeeld voert u verschillende stappen voor gegevensvoorbereiding uit:
- De gegevensset filteren om uitbijters en onjuiste waarden te verwijderen
- Kolommen verwijderen die niet nodig zijn voor modeltraining
- Nieuwe kolommen maken op basis van de onbewerkte gegevens
- Een label genereren om te bepalen of een bepaalde taxirit al dan niet een tip bevat
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Maak vervolgens een tweede doorgang over de gegevens om de uiteindelijke functies toe te voegen.
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Een logistiek regressiemodel maken
Met de laatste taak worden de gelabelde gegevens geconverteerd naar een indeling die logistieke regressie kan verwerken. De invoer voor een logistiek regressiealgoritme moet een structuur van label-/functievectorparen hebben, waarbij de functievector een vector is van getallen die het invoerpunt vertegenwoordigen.
Op basis van de uiteindelijke taakvereisten moeten we de categorische kolommen converteren naar getallen. We moeten de trafficTimeBins
kolommen weekdayString
met name converteren naar gehele getallen. Er zijn veel opties beschikbaar om aan deze vereiste te voldoen. In dit voorbeeld wordt de OneHotEncoder
aanpak gebruikt:
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Deze actie resulteert in een nieuw DataFrame met alle kolommen in de juiste indeling om een model te trainen.
Een logistiek regressiemodel trainen
De eerste taak splitst de gegevensset in een trainingsset en een test- of validatieset.
# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Zodra we twee DataFrames hebben, moeten we de modelformule maken en uitvoeren op het dataframe voor training. Vervolgens kunnen we valideren op basis van het testdataFrame. Experimenteer met verschillende versies van de modelformule om de effecten van verschillende combinaties te bekijken.
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
De uitvoer van de cel:
Area under ROC = 0.9749430523917996
Een visuele weergave van de voorspelling maken
We kunnen nu een definitieve visualisatie maken om de modelresultaten te interpreteren. Een ROC-curve kan het resultaat zeker presenteren.
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
Gerelateerde inhoud
- AI-voorbeelden gebruiken om machine learning-modellen te bouwen: AI-voorbeelden gebruiken
- Machine learning-uitvoeringen bijhouden met experimenten: Machine learning-experimenten