Erstellen eines Machine Learning-Modells mit Apache Spark MLlib
In diesem Artikel erfahren Sie, wie Sie mithilfe von Apache Spark MLlib eine Machine-Learning-Anwendung erstellen, die eine einfache Vorhersageanalyse für ein Azure Open Dataset ausführt. Spark bietet integrierte Machine Learning-Bibliotheken. In diesem Beispiel wird eine Klassifizierung mittels logistischer Regression verwendet.
Die SparkML und MLlib Spark-Kernbibliotheken bieten viele Dienstprogramme, die für Machine Learning Aufgaben nützlich sind. Diese Dienstprogramme eignen sich für:
- Klassifizierung
- Clustering
- Testen von Hypothesen und Berechnen von Beispielstatistiken
- Regression
- Singulärwertzerlegung (Singular Value Decomposition, SVD) und Hauptkomponentenanalyse (Principal Component Analysis, PCA)
- Themenmodellierung
Grundlegendes zu Klassifizierung und logistischer Regression
Bei der Klassifizierung, einer beliebten Aufgabe des Machine Learning, werden die Eingabedaten in Kategorien sortiert. Ein Klassifizierungsalgorithmus sollte herausfinden, wie die bereitgestellten Eingabedaten Bezeichnungen zugewiesen werden. So könnte ein Machine Learning-Algorithmus beispielsweise Börsendaten als Eingabe akzeptieren und die Daten in zwei Kategorien einteilen: Aktien, die Sie verkaufen sollten, und solche, die Sie behalten sollten.
Der Algorithmus Logistische Regression ist nützlich für die Klassifizierung. Die API für die logistische Regression von Spark ist nützlich für eine binäre Klassifizierung der Eingabedaten in einer von zwei Gruppen. Weitere Informationen zur logistischen Regression finden Sie in Wikipedia.
Die logistische Regression erzeugt eine logistische Funktion, die die Wahrscheinlichkeit vorhersagen kann, dass ein Eingabevektor zu einer Gruppe gehört.
Beispiel für eine Vorhersageanalyse von NYC-Taxidaten
Installieren Sie zunächst azureml-opendatasets
. Die Daten sind über die Ressource Azure Open Datasets verfügbar. Diese Teilmenge des Datasets enthält Informationen zu Taxifahrten von Yellow Cabs, einschließlich Informationen zu den Start- und Endzeiten, den Start- und Zielorten, den Fahrtkosten und anderer Attribute.
%pip install azureml-opendatasets
Der Rest dieses Artikels stützt sich auf Apache Spark, um einige Analysen der NYC-Taxi-Trinkgelddaten durchzuführen und dann ein Modell zu entwickeln, um vorherzusagen, ob eine bestimmte Reise ein Trinkgeld enthält oder nicht.
Erstellen eines Apache Spark-Machine Learning-Modells
Erstellen Sie ein PySpark-Notebook. Weitere Informationen finden Sie unter Erstellen eines Notizbuchs.
Importieren Sie die für dieses Notebook erforderlichen Typen.
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluator
Sie verwenden MLflow, um die Machine Learning-Experimente und die entsprechenden Ausführungen nachzuverfolgen. Wenn die automatische Protokollierung von Microsoft Fabric aktiviert ist, werden die entsprechenden Metriken und Parameter automatisch erfasst.
import mlflow
Erstellen des Eingabedatenrahmens
In diesem Beispiel werden die Daten in einen Pandas-Datenrahmen geladen und dann in einen Apache Spark-Datenrahmen konvertiert. In diesem Format können Sie andere Apache Spark-Vorgänge anwenden, um das Dataset zu bereinigen und zu filtern.
Fügen Sie diese Zeilen in eine neue Zelle ein, und führen Sie sie aus, um einen Spark DataFrame zu erstellen. In diesem Schritt werden die Daten über die Open Datasets-API abgerufen. Sie können diese Daten durch Filtern reduzieren, um ein bestimmtes Datenfenster zu untersuchen. Im Codebeispiel werden
start_date
undend_date
verwendet, um einen Filter anzuwenden, der Daten für einen einzelnen Monat zurückgibt.from azureml.opendatasets import NycTlcYellow end_date = parser.parse('2018-06-06') start_date = parser.parse('2018-05-01') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) nyc_tlc_pd = nyc_tlc.to_pandas_dataframe() nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
Dieser Code reduziert das Dataset auf etwa 10.000 Zeilen. Um die Entwicklung und das Training zu beschleunigen, wird der Code vorerst das Dataset abtasten.
# To make development easier, faster, and less expensive, sample down for now sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
Wir wollen unsere Daten mit Hilfe des integrierten
display()
-Befehls betrachten. Mit diesem Befehl können Sie auf einfache Weise eine Datenprobe anzeigen oder Trends in den Daten grafisch untersuchen.#sampled_taxi_df.show(10) display(sampled_taxi_df.limit(10))
Vorbereiten der Daten
Die Datenaufbereitung ist ein wichtiger Schritt im Machine Learning-Prozess. Sie umfasst das Bereinigen, die Transformation und Organisation von Rohdaten, um sie für die Analyse und Modellierung geeignet zu machen. In diesem Codebeispiel führen Sie mehrere Datenaufbereitungsschritte aus:
- Filtern des Datensatzes zum Entfernen von Ausreißern und falschen Werten
- Entfernen von Spalten, die für das Modelltraining nicht benötigt werden
- Erstellen neuer Spalten aus den Rohdaten
- Generieren Sie eine Bezeichnung, um zu bestimmen, ob eine bestimmte Taxifahrt ein Trinkgeld beinhaltet.
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
In einem zweiten Durchlauf für die Daten werden dann die endgültigen Features hinzugefügt.
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Erstellen eines logistischen Regressionsmodells
Die letzte Aufgabe besteht darin, die Daten mit Label in ein Format zu konvertieren, das die logistische Regression verarbeiten kann. Die Eingabe für einen logistischen Regressionsalgorithmus muss eine Struktur von Paaren aus Bezeichnung und Featurevektor aufweisen, wobei der Featurevektor aus Zahlen besteht, die den Eingabepunkt darstellen.
Basierend auf den endgültigen Aufgabenanforderungen müssen wir die kategorisierten Spalten in Zahlen konvertieren. Genauer gesagt, müssen die Spalten trafficTimeBins
und weekdayString
in ganzzahlige Darstellungen konvertiert werden. Wir haben viele Optionen, um diese Anforderung zu erfüllen. In diesem Beispiel wird der OneHotEncoder
Ansatz berücksichtigt:
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Diese Aktion führt zu einem neuen Datenrahmen mit allen Spalten im richtigen Format, um ein Modell zu trainieren.
Trainieren eines logistischen Regressionsmodells
Die erste Aufgabe teilt das Dataset in ein Trainingsdataset und ein Test- oder Validierungsdataset auf.
# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Sobald zwei DataFrames vorhanden sind, müssen wir die Modellformel erstellen und mit dem Training DataFrame ausführen. Anschließend können Sie eine Überprüfung anhand des Testdatenrahmens ausführen. Experimentieren Sie mit verschiedenen Versionen der Modellformel, um die Auswirkungen verschiedener Kombinationen zu ermitteln.
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
Die Zellenausgabe:
Area under ROC = 0.9749430523917996
Erstellen einer visuellen Darstellung der Vorhersage
Sie können jetzt eine endgültige Visualisierung erstellen, um die Modellergebnisse zu interpretieren. Eine ROC-Kurve kann sicherlich das Ergebnis darstellen.
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
Zugehöriger Inhalt
- Verwenden von KI-Beispielen zum Erstellen von Machine Learning-Modellen: Verwenden von KI-Beispielen
- Nachverfolgen von Machine Learning-Ausführungen mithilfe von Experimenten: Machine Learning-Experimente