Erstellen einer Apache Spark-Machine Learning-Pipeline
Die skalierbare Machine Learning-Bibliothek von Apache Spark (MLlib) bindet Modellierungsfunktionen in eine verteilte Umgebung ein. Das Spark-Paket spark.ml
umfasst eine Gruppe von auf Dataframes basierenden APIs auf hoher Ebene. Mit diesen APIs können Sie praktische Machine Learning-Pipelines erstellen und optimieren.
Spark-Machine Learning bezieht sich auf diese auf Dataframes basierende MLlib-API und nicht auf die ältere RDD-basierte Pipeline-API.
Bei einer Machine Learning-Pipeline (ML-Pipeline) handelt es sich um einen vollständigen Workflow, der mehrere Algorithmen für maschinelles Lernen miteinander kombiniert. Zum Verarbeiten und Lernen von Daten können viele Schritte und eine Reihe von Algorithmen erforderlich sein. Pipelines definieren die Phasen eines Machine Learning-Prozesses und deren Reihenfolge. In MLlib werden Phasen einer Pipeline durch eine bestimmte Abfolge von Pipelinephasen dargestellt, wobei ein Transformator und ein Estimator jeweils Aufgaben ausführen.
Ein Transformator ist ein Algorithmus, der unter Verwendung der transform()
-Methode einen Dataframe in einen anderen Dataframe transformiert. Beispielsweise kann ein Featuretransformator eine Spalte eines Dataframes lesen, diese einer anderen Spalte zuordnen und einen neuen Dataframe mit der angehängten zugeordneten Spalte ausgeben.
Ein Estimator ist eine Abstraktion von Lernalgorithmen und dient der Anpassung oder dem Training für ein Dataset zum Generieren eines Transformators. Ein Estimator implementiert eine Methode mit dem Namen fit()
, die einen Dataframe akzeptiert und einen Dataframe generiert, der ein Transformator ist.
Jeder zustandslosen Instanz eines Transformators oder Estimators ist ein eigener eindeutiger Bezeichner zugewiesen, der beim Angeben von Parametern verwendet wird. Beide verwenden eine einheitliche API zur Angabe dieser Parameter.
Pipelinebeispiel
Zur Veranschaulichung der praktischen Verwendung einer ML-Pipeline verwendet dieses Beispiel die Datendatei HVAC.csv
, die bereits in den Standardspeicher (Azure Storage oder Data Lake Storage) für Ihren HDInsight-Cluster geladen wurde. Zum Anzeigen des Inhalts der Datei navigieren Sie zum Verzeichnis /HdiSamples/HdiSamples/SensorSampleData/hvac
.
HVAC.csv
enthält einen Satz von Zeiten mit den Zieltemperaturen und den tatsächlichen Temperaturen für HVAC-Systeme (Heating, Ventilation, Air Conditioning) in verschiedenen Gebäuden. Ziel ist das Trainieren des Modells für die Daten und das Generieren einer Temperaturprognose für ein bestimmtes Gebäude.
Der folgende Code
- Definiert ein
LabeledDocument
, in dem dieBuildingID
, dieSystemInfo
(Bezeichner eines Systems und Alter) und einlabel
(1.0, wenn das Gebäude überhitzt ist, andernfalls 0.0) gespeichert sind. - Erstellt die benutzerdefinierte Parserfunktion
parseDocument
, die eine Datenzeile abruft und ermittelt, ob das Gebäude warm („hot“) ist, indem die Zieltemperatur mit der tatsächlichen Temperatur verglichen wird. - Wendet den Parser beim Extrahieren der Quelldaten an.
- Erstellt Trainingsdaten.
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
# The data structure (column meanings) of the data array:
# 0 Date
# 1 Time
# 2 TargetTemp
# 3 ActualTemp
# 4 System
# 5 SystemAge
# 6 BuildingID
LabeledDocument = Row("BuildingID", "SystemInfo", "label")
# Define a function that parses the raw CSV file and returns an object of type LabeledDocument
def parseDocument(line):
values = [str(x) for x in line.split(',')]
if (values[3] > values[2]):
hot = 1.0
else:
hot = 0.0
textValue = str(values[4]) + " " + str(values[5])
return LabeledDocument((values[6]), textValue, hot)
# Load the raw HVAC.csv file, parse it using the function
data = sc.textFile(
"wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
documents = data.filter(lambda s: "Date" not in s).map(parseDocument)
training = documents.toDF()
Diese Beispielpipeline umfasst drei Phasen: Tokenizer
und HashingTF
(beide Transformatoren) und Logistic Regression
(Estimator). Die extrahierten und analysierten Daten im training
-Dataframe durchlaufen die Pipeline, wenn pipeline.fit(training)
aufgerufen wird.
- In der ersten Phase
Tokenizer
wird die EingabespalteSystemInfo
(bestehend aus dem Systembezeichner und den Alterswerten) in die Ausgabespaltewords
unterteilt. Diese neue Spaltewords
wird dem Dataframe hinzugefügt. - In der zweiten Phase
HashingTF
wird die neue Spaltewords
in Funktionsvektoren umgewandelt. Diese neue Spaltefeatures
wird dem Dataframe hinzugefügt. Bei diesen ersten beiden Phasen handelt es sich um Transformatoren. - Bei der dritten Phase
LogisticRegression
handelt es sich um einen Estimator, daher ruft die Pipeline dieLogisticRegression.fit()
-Methode auf, um einLogisticRegressionModel
zu generieren.
tokenizer = Tokenizer(inputCol="SystemInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Build the pipeline with our tokenizer, hashingTF, and logistic regression stages
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)
Um die neuen von den Transformatoren Tokenizer
und HashingTF
hinzugefügten Spalten words
und features
sowie ein Beispiel für den Estimator LogisticRegression
anzuzeigen, führen Sie eine PipelineModel.transform()
-Methode für den ursprünglichen Dataframe aus. Im Produktionscode ist der nächste Schritt die Übergabe in einen Testdataframe, um das Training zu überprüfen.
peek = model.transform(training)
peek.show()
# Outputs the following:
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|BuildingID|SystemInfo|label| words| features| rawPrediction| probability|prediction|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
| 4| 13 20| 0.0|[13, 20]|(262144,[250802,2...|[0.11943986671420...|[0.52982451901740...| 0.0|
| 17| 3 20| 0.0| [3, 20]|(262144,[89074,25...|[0.17511205617446...|[0.54366648775222...| 0.0|
| 18| 17 20| 1.0|[17, 20]|(262144,[64358,25...|[0.14620993833623...|[0.53648750722548...| 0.0|
| 15| 2 23| 0.0| [2, 23]|(262144,[31351,21...|[-0.0361327091023...|[0.49096780538523...| 1.0|
| 3| 16 9| 1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...| 1.0|
| 4| 13 28| 0.0|[13, 28]|(262144,[69821,25...|[0.14630166986618...|[0.53651031790592...| 0.0|
| 2| 12 24| 0.0|[12, 24]|(262144,[187043,2...|[-0.0509556393066...|[0.48726384581522...| 1.0|
| 16| 20 26| 1.0|[20, 26]|(262144,[128319,2...|[0.33829638728900...|[0.58377663577684...| 0.0|
| 9| 16 9| 1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...| 1.0|
| 12| 6 5| 0.0| [6, 5]|(262144,[18659,89...|[0.07513008136562...|[0.51877369045183...| 0.0|
| 15| 10 17| 1.0|[10, 17]|(262144,[64358,25...|[-0.0291988646553...|[0.49270080242078...| 1.0|
| 7| 2 11| 0.0| [2, 11]|(262144,[212053,2...|[0.03678030020834...|[0.50919403860812...| 0.0|
| 15| 14 2| 1.0| [14, 2]|(262144,[109681,2...|[0.06216423725633...|[0.51553605651806...| 0.0|
| 6| 3 2| 0.0| [3, 2]|(262144,[89074,21...|[0.00565582077537...|[0.50141395142468...| 0.0|
| 20| 19 22| 0.0|[19, 22]|(262144,[139093,2...|[-0.0769288695989...|[0.48077726176073...| 1.0|
| 8| 19 11| 0.0|[19, 11]|(262144,[139093,2...|[0.04988910033929...|[0.51246968885151...| 0.0|
| 6| 15 7| 0.0| [15, 7]|(262144,[77099,20...|[0.14854929135994...|[0.53706918109610...| 0.0|
| 13| 12 5| 0.0| [12, 5]|(262144,[89689,25...|[-0.0519932532562...|[0.48700461408785...| 1.0|
| 4| 8 22| 0.0| [8, 22]|(262144,[98962,21...|[-0.0120753606650...|[0.49698119651572...| 1.0|
| 7| 17 5| 0.0| [17, 5]|(262144,[64358,89...|[-0.0721054054871...|[0.48198145477106...| 1.0|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
only showing top 20 rows
Das model
-Objekt kann nun für Vorhersagen verwendet werden. Das vollständige Beispiel dieser Machine Learning-Anwendung sowie eine ausführliche Anleitung für die Ausführung finden Sie unter Erstellen einer Apache Spark-Machine Learning-Anwendung in Azure HDInsight.