Een machine learning-pijplijn in Apache Spark maken
De schaalbare machine learning-bibliotheek (MLlib) van Apache Spark biedt modelleringsmogelijkheden in een gedistribueerde omgeving. Het Spark-pakket spark.ml
is een set API's op hoog niveau die zijn gebouwd op DataFrames. Met deze API's kunt u praktische machine learning-pijplijnen maken en afstemmen.
Spark-machine learning verwijst naar deze op MLlib DataFrame gebaseerde API, niet naar de oudere RDD-pijplijn-API.
Een machine learning-pijplijn (ML) is een volledige werkstroom waarin meerdere machine learning-algoritmen worden gecombineerd. Er kunnen veel stappen nodig zijn om gegevens te verwerken en ervan te leren. Hiervoor is een reeks algoritmen vereist. Pijplijnen definiëren de fasen en volgorde van een machine learning-proces. In MLlib worden fasen van een pijplijn vertegenwoordigd door een specifieke reeks PipelineStages, waarbij een transformer en een estimator elk taken uitvoeren.
Een transformer is een algoritme waarmee het ene DataFrame naar het andere wordt getransformeerd met behulp van de transform()
-methode. Een functietransformator kan bijvoorbeeld één kolom van een DataFrame lezen, toewijzen aan een andere kolom en een nieuw DataFrame uitvoeren waaraan de toegewezen kolom is toegevoegd.
Een estimator is een abstractie van leeralgoritmen en is verantwoordelijk voor het aanpassen of trainen van een gegevensset om een transformer te produceren. Een estimator implementeert een methode met de naam fit()
, die een DataFrame accepteert en een DataFrame produceert, wat een transformer is.
Elk staatloze exemplaar van een transformer of estimator heeft een eigen unieke id, die wordt gebruikt bij het opgeven van parameters. Beide gebruiken een uniforme API voor het opgeven van deze parameters.
Voorbeeld van pijplijn
Ter illustratie van een praktisch gebruik van een ML-pijplijn wordt in dit voorbeeld het voorbeeldgegevensbestand HVAC.csv
gebruikt dat vooraf is geladen in de standaardopslag voor uw HDInsight-cluster, azure storage of Data Lake Storage. Als u de inhoud van het bestand wilt weergeven, gaat u naar de /HdiSamples/HdiSamples/SensorSampleData/hvac
map.
HVAC.csv
bevat een reeks tijden met zowel de beoogde als de werkelijke temperaturen voor HVAC-systemen (verwarming, ventilatie en airconditioning) in verschillende gebouwen. Het doel is om het model te trainen op basis van de gegevens en een prognosetemperatuur voor een bepaald gebouw te produceren.
De volgende code:
- Definieert een
LabeledDocument
, waarin deBuildingID
opgeslagen ,SystemInfo
(de id en leeftijd van een systeem) en eenlabel
(1,0 als het gebouw te heet is, 0,0 anders). - Hiermee maakt u een aangepaste parserfunctie
parseDocument
die een regel (rij) met gegevens gebruikt en bepaalt of het gebouw 'heet' is door de doeltemperatuur te vergelijken met de werkelijke temperatuur. - Past de parser toe bij het extraheren van de brongegevens.
- Hiermee maakt u trainingsgegevens.
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
# The data structure (column meanings) of the data array:
# 0 Date
# 1 Time
# 2 TargetTemp
# 3 ActualTemp
# 4 System
# 5 SystemAge
# 6 BuildingID
LabeledDocument = Row("BuildingID", "SystemInfo", "label")
# Define a function that parses the raw CSV file and returns an object of type LabeledDocument
def parseDocument(line):
values = [str(x) for x in line.split(',')]
if (values[3] > values[2]):
hot = 1.0
else:
hot = 0.0
textValue = str(values[4]) + " " + str(values[5])
return LabeledDocument((values[6]), textValue, hot)
# Load the raw HVAC.csv file, parse it using the function
data = sc.textFile(
"wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
documents = data.filter(lambda s: "Date" not in s).map(parseDocument)
training = documents.toDF()
Deze voorbeeldpijplijn heeft drie fasen: Tokenizer
en HashingTF
(beide transformatoren) en Logistic Regression
(een estimator). De geëxtraheerde en geparseerde gegevens in het training
DataFrame stromen door de pijplijn wanneer pipeline.fit(training)
wordt aangeroepen.
- De eerste fase,
Tokenizer
, splitst deSystemInfo
invoerkolom (bestaande uit de systeem-id en leeftijdswaarden) in eenwords
uitvoerkolom. Deze nieuwewords
kolom wordt toegevoegd aan het DataFrame. - De tweede fase,
HashingTF
, converteert de nieuwewords
kolom naar functievectoren. Deze nieuwefeatures
kolom wordt toegevoegd aan het DataFrame. Deze eerste twee fasen zijn Transformers. - De derde fase,
LogisticRegression
, is een estimator en daarom roept de pijplijn deLogisticRegression.fit()
methode aan om eenLogisticRegressionModel
te produceren.
tokenizer = Tokenizer(inputCol="SystemInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Build the pipeline with our tokenizer, hashingTF, and logistic regression stages
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)
Voer een PipelineModel.transform()
methode uit op het oorspronkelijke DataFrame om de nieuwe words
kolommen en features
te zien die zijn toegevoegd door de Tokenizer
transformers en HashingTF
een voorbeeld van de LogisticRegression
estimator. In de productiecode is de volgende stap het doorgeven van een Test DataFrame om de training te valideren.
peek = model.transform(training)
peek.show()
# Outputs the following:
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|BuildingID|SystemInfo|label| words| features| rawPrediction| probability|prediction|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
| 4| 13 20| 0.0|[13, 20]|(262144,[250802,2...|[0.11943986671420...|[0.52982451901740...| 0.0|
| 17| 3 20| 0.0| [3, 20]|(262144,[89074,25...|[0.17511205617446...|[0.54366648775222...| 0.0|
| 18| 17 20| 1.0|[17, 20]|(262144,[64358,25...|[0.14620993833623...|[0.53648750722548...| 0.0|
| 15| 2 23| 0.0| [2, 23]|(262144,[31351,21...|[-0.0361327091023...|[0.49096780538523...| 1.0|
| 3| 16 9| 1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...| 1.0|
| 4| 13 28| 0.0|[13, 28]|(262144,[69821,25...|[0.14630166986618...|[0.53651031790592...| 0.0|
| 2| 12 24| 0.0|[12, 24]|(262144,[187043,2...|[-0.0509556393066...|[0.48726384581522...| 1.0|
| 16| 20 26| 1.0|[20, 26]|(262144,[128319,2...|[0.33829638728900...|[0.58377663577684...| 0.0|
| 9| 16 9| 1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...| 1.0|
| 12| 6 5| 0.0| [6, 5]|(262144,[18659,89...|[0.07513008136562...|[0.51877369045183...| 0.0|
| 15| 10 17| 1.0|[10, 17]|(262144,[64358,25...|[-0.0291988646553...|[0.49270080242078...| 1.0|
| 7| 2 11| 0.0| [2, 11]|(262144,[212053,2...|[0.03678030020834...|[0.50919403860812...| 0.0|
| 15| 14 2| 1.0| [14, 2]|(262144,[109681,2...|[0.06216423725633...|[0.51553605651806...| 0.0|
| 6| 3 2| 0.0| [3, 2]|(262144,[89074,21...|[0.00565582077537...|[0.50141395142468...| 0.0|
| 20| 19 22| 0.0|[19, 22]|(262144,[139093,2...|[-0.0769288695989...|[0.48077726176073...| 1.0|
| 8| 19 11| 0.0|[19, 11]|(262144,[139093,2...|[0.04988910033929...|[0.51246968885151...| 0.0|
| 6| 15 7| 0.0| [15, 7]|(262144,[77099,20...|[0.14854929135994...|[0.53706918109610...| 0.0|
| 13| 12 5| 0.0| [12, 5]|(262144,[89689,25...|[-0.0519932532562...|[0.48700461408785...| 1.0|
| 4| 8 22| 0.0| [8, 22]|(262144,[98962,21...|[-0.0120753606650...|[0.49698119651572...| 1.0|
| 7| 17 5| 0.0| [17, 5]|(262144,[64358,89...|[-0.0721054054871...|[0.48198145477106...| 1.0|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
only showing top 20 rows
Het model
object kan nu worden gebruikt om voorspellingen te doen. Zie Apache Spark machine learning-toepassingen bouwen in Azure HDInsight voor het volledige voorbeeld van deze machine learning-toepassing en stapsgewijze instructies voor het uitvoeren ervan.