Tworzenie potoku uczenia maszynowego platformy Apache Spark
Skalowalna biblioteka uczenia maszynowego (MLlib) platformy Apache Spark udostępnia możliwości modelowania w środowisku rozproszonym. Pakiet spark.ml
Spark to zestaw interfejsów API wysokiego poziomu opartych na obiektach DataFrame. Te interfejsy API ułatwiają tworzenie i dostrajanie praktycznych potoków uczenia maszynowego.
Uczenie maszynowe platformy Spark odnosi się do tego interfejsu API opartego na ramce danych MLlib, a nie starszego interfejsu API potoku opartego na RDD.
Potok uczenia maszynowego to kompletny przepływ pracy łączący wiele algorytmów uczenia maszynowego. Istnieje wiele kroków wymaganych do przetwarzania i uczenia się na podstawie danych, co wymaga sekwencji algorytmów. Potoki definiują etapy i kolejność procesu uczenia maszynowego. W języku MLlib etapy potoku są reprezentowane przez określoną sekwencję PipelineStages, gdzie funkcja Transformer i narzędzie do szacowania wykonują zadania.
Funkcja Transformer to algorytm, który przekształca jedną ramkę danych na drugą transform()
przy użyciu metody . Na przykład funkcja przekształcania funkcji może odczytywać jedną kolumnę ramki danych, mapować ją na inną kolumnę i wyprowadzać nową ramkę danych z dołączonym do niej mapowaną kolumną.
Narzędzie do szacowania to abstrakcja algorytmów uczenia i odpowiada za dopasowywanie lub trenowanie zestawu danych w celu utworzenia funkcji Transformer. Narzędzie do szacowania implementuje metodę o nazwie fit()
, która akceptuje ramkę danych i tworzy ramkę danych, która jest transformatorem.
Każde wystąpienie bezstanowe transformatora lub narzędzia do szacowania ma własny unikatowy identyfikator, który jest używany podczas określania parametrów. Oba używają jednolitego interfejsu API do określania tych parametrów.
Przykład potoku
Aby zademonstrować praktyczne użycie potoku uczenia maszynowego, w tym przykładzie użyto przykładowego HVAC.csv
pliku danych, który jest wstępnie załadowany do domyślnego magazynu dla klastra usługi HDInsight, usługi Azure Storage lub Data Lake Storage. Aby wyświetlić zawartość pliku, przejdź do /HdiSamples/HdiSamples/SensorSampleData/hvac
katalogu.
HVAC.csv
zawiera zestaw czasów z temperaturami docelowymi i rzeczywistymi dla systemów grzewczo-wentylacyjnych (ogrzewania, wentylacji i klimatyzacji) w różnych budynkach. Celem jest wytrenowanie modelu na danych i wygenerowanie prognozy temperatury dla danego budynku.
Następujący kod powoduje:
-
LabeledDocument
Definiuje element , który przechowujeBuildingID
element ,SystemInfo
(identyfikator systemu i wiek) orazlabel
wartość (1.0, jeśli budynek jest zbyt gorący, 0,0 w przeciwnym razie). - Tworzy niestandardową funkcję
parseDocument
analizatora, która przyjmuje wiersz (wiersz) danych i określa, czy budynek jest "gorący", porównując temperaturę docelową z rzeczywistą temperaturą. - Stosuje analizator podczas wyodrębniania danych źródłowych.
- Tworzy dane szkoleniowe.
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
# The data structure (column meanings) of the data array:
# 0 Date
# 1 Time
# 2 TargetTemp
# 3 ActualTemp
# 4 System
# 5 SystemAge
# 6 BuildingID
LabeledDocument = Row("BuildingID", "SystemInfo", "label")
# Define a function that parses the raw CSV file and returns an object of type LabeledDocument
def parseDocument(line):
values = [str(x) for x in line.split(',')]
if (values[3] > values[2]):
hot = 1.0
else:
hot = 0.0
textValue = str(values[4]) + " " + str(values[5])
return LabeledDocument((values[6]), textValue, hot)
# Load the raw HVAC.csv file, parse it using the function
data = sc.textFile(
"wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
documents = data.filter(lambda s: "Date" not in s).map(parseDocument)
training = documents.toDF()
Ten przykładowy potok ma trzy etapy: Tokenizer
i HashingTF
(transformatory) i Logistic Regression
(narzędzie do szacowania). Wyodrębnione i przeanalizowane dane w ramce training
danych przepływa przez potok po pipeline.fit(training)
wywołaniu.
- Pierwszy etap dzieli
Tokenizer
kolumnę wejściowąSystemInfo
(składającą się z identyfikatora systemu i wartości wieku) na kolumnę wyjściowąwords
. Ta nowawords
kolumna jest dodawana do ramki danych. - Drugi etap ,
HashingTF
konwertuje nowąwords
kolumnę na wektory cech. Ta nowafeatures
kolumna jest dodawana do ramki danych. Te pierwsze dwa etapy to Transformatory. - Trzeci etap,
LogisticRegression
, jest narzędzie do szacowania, a więc potok wywołujeLogisticRegression.fit()
metodę w celu utworzenia klasyLogisticRegressionModel
.
tokenizer = Tokenizer(inputCol="SystemInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Build the pipeline with our tokenizer, hashingTF, and logistic regression stages
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)
Aby wyświetlić nowe words
kolumny i features
dodane przez Tokenizer
transformatory i HashingTF
oraz próbkę LogisticRegression
narzędzia do szacowania, uruchom metodę PipelineModel.transform()
w oryginalnej ramce danych. W kodzie produkcyjnym następnym krokiem będzie przekazanie do testowej ramki danych w celu zweryfikowania trenowania.
peek = model.transform(training)
peek.show()
# Outputs the following:
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|BuildingID|SystemInfo|label| words| features| rawPrediction| probability|prediction|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
| 4| 13 20| 0.0|[13, 20]|(262144,[250802,2...|[0.11943986671420...|[0.52982451901740...| 0.0|
| 17| 3 20| 0.0| [3, 20]|(262144,[89074,25...|[0.17511205617446...|[0.54366648775222...| 0.0|
| 18| 17 20| 1.0|[17, 20]|(262144,[64358,25...|[0.14620993833623...|[0.53648750722548...| 0.0|
| 15| 2 23| 0.0| [2, 23]|(262144,[31351,21...|[-0.0361327091023...|[0.49096780538523...| 1.0|
| 3| 16 9| 1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...| 1.0|
| 4| 13 28| 0.0|[13, 28]|(262144,[69821,25...|[0.14630166986618...|[0.53651031790592...| 0.0|
| 2| 12 24| 0.0|[12, 24]|(262144,[187043,2...|[-0.0509556393066...|[0.48726384581522...| 1.0|
| 16| 20 26| 1.0|[20, 26]|(262144,[128319,2...|[0.33829638728900...|[0.58377663577684...| 0.0|
| 9| 16 9| 1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...| 1.0|
| 12| 6 5| 0.0| [6, 5]|(262144,[18659,89...|[0.07513008136562...|[0.51877369045183...| 0.0|
| 15| 10 17| 1.0|[10, 17]|(262144,[64358,25...|[-0.0291988646553...|[0.49270080242078...| 1.0|
| 7| 2 11| 0.0| [2, 11]|(262144,[212053,2...|[0.03678030020834...|[0.50919403860812...| 0.0|
| 15| 14 2| 1.0| [14, 2]|(262144,[109681,2...|[0.06216423725633...|[0.51553605651806...| 0.0|
| 6| 3 2| 0.0| [3, 2]|(262144,[89074,21...|[0.00565582077537...|[0.50141395142468...| 0.0|
| 20| 19 22| 0.0|[19, 22]|(262144,[139093,2...|[-0.0769288695989...|[0.48077726176073...| 1.0|
| 8| 19 11| 0.0|[19, 11]|(262144,[139093,2...|[0.04988910033929...|[0.51246968885151...| 0.0|
| 6| 15 7| 0.0| [15, 7]|(262144,[77099,20...|[0.14854929135994...|[0.53706918109610...| 0.0|
| 13| 12 5| 0.0| [12, 5]|(262144,[89689,25...|[-0.0519932532562...|[0.48700461408785...| 1.0|
| 4| 8 22| 0.0| [8, 22]|(262144,[98962,21...|[-0.0120753606650...|[0.49698119651572...| 1.0|
| 7| 17 5| 0.0| [17, 5]|(262144,[64358,89...|[-0.0721054054871...|[0.48198145477106...| 1.0|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
only showing top 20 rows
Obiekt model
może teraz służyć do przewidywania. Aby zapoznać się z pełnym przykładem tej aplikacji uczenia maszynowego i instrukcjami krok po kroku dotyczącymi jej uruchamiania, zobacz Tworzenie aplikacji uczenia maszynowego platformy Apache Spark w usłudze Azure HDInsight.