Vytvoření modelu strojového učení pomocí knihovny Apache Spark MLlib
V tomto článku se dozvíte, jak pomocí knihovny Apache Spark MLlib vytvořit aplikaci strojového učení, která zpracovává jednoduchou prediktivní analýzu v otevřené datové sadě Azure. Spark poskytuje integrované knihovny strojového učení. Tento příklad používá klasifikaci prostřednictvím logistické regrese.
Základní knihovny SparkML a MLlib Spark poskytují mnoho nástrojů, které jsou užitečné pro úlohy strojového učení. Tyto nástroje jsou vhodné pro:
- Klasifikace
- Clustering
- Testování hypotéz a výpočet vzorové statistiky
- Regrese
- Rozklad hodnoty s jednotným číslem (SVD) a analýza hlavních komponent (PCA)
- Modelování témat
Vysvětlení klasifikace a logistické regrese
Klasifikace, oblíbená úloha strojového učení, zahrnuje řazení vstupních dat do kategorií. Klasifikační algoritmus by měl zjistit, jak přiřadit popisky zadaným vstupním datům. Například algoritmus strojového učení může přijímat informace o akciích jako vstup a rozdělit akcie do dvou kategorií: akcie, které byste měli prodávat, a akcie, které byste měli zachovat.
Algoritmus logistické regrese je užitečný pro klasifikaci. Rozhraní API logistické regrese Sparku je užitečné pro binární klasifikaci vstupních dat do jedné ze dvou skupin. Další informace o logistické regresi najdete na Wikipedii.
Logistická regrese vytváří logistickou funkci , která dokáže předpovědět pravděpodobnost, že vstupní vektor patří do jedné skupiny nebo druhé.
Příklad prediktivní analýzy dat taxislužby NYC
Nejprve nainstalujte azureml-opendatasets
. Data jsou k dispozici prostřednictvím prostředku Azure Open Datasets . Tato podmnožina datové sady hostuje informace o žlutých cestách taxíkem, včetně počátečních časů, časů ukončení, umístění startu, koncových umístění, nákladů na jízdu a dalších atributů.
%pip install azureml-opendatasets
Zbytek tohoto článku spoléhá na Apache Spark, aby nejprve provedl analýzu dat tipu taxi-výletu v NYC a pak vyvinul model, který předpovídá, jestli konkrétní cesta obsahuje tip, nebo ne.
Vytvoření modelu strojového učení Apache Sparku
Vytvořte poznámkový blok PySpark. Další informace najdete v tématu Vytvoření poznámkového bloku.
Importujte typy požadované pro tento poznámkový blok.
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluator
MLflow použijeme ke sledování experimentů strojového učení a odpovídajících spuštění. Pokud je povolené automatické protokolování Microsoft Fabric, zachytí se automaticky odpovídající metriky a parametry.
import mlflow
Vytvoření vstupního datového rámce
Tento příklad načte data do datového rámce Pandas a pak je převede na datový rámec Apache Spark. V takovém formátu můžeme k vyčištění a filtrování datové sady použít další operace Apache Sparku.
Vložte tyto řádky do nové buňky a spusťte je a vytvořte datový rámec Sparku. Tento krok načte data prostřednictvím rozhraní API Open Datasets. Tato data můžeme filtrovat, abychom prozkoumali konkrétní okno dat. Příklad kódu používá
start_date
aend_date
použije filtr, který vrátí jeden měsíc dat.from azureml.opendatasets import NycTlcYellow end_date = parser.parse('2018-06-06') start_date = parser.parse('2018-05-01') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) nyc_tlc_pd = nyc_tlc.to_pandas_dataframe() nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
Tento kód zmenšuje datovou sadu na přibližně 10 000 řádků. Pokud chcete urychlit vývoj a trénování, ukázky kódu pro naši datovou sadu prozatím zrychlují.
# To make development easier, faster, and less expensive, sample down for now sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
Chceme se podívat na naše data pomocí integrovaného
display()
příkazu. Pomocí tohoto příkazu můžeme snadno zobrazit ukázku dat nebo graficky prozkoumat trendy v datech.#sampled_taxi_df.show(10) display(sampled_taxi_df.limit(10))
Příprava dat
Příprava dat je zásadním krokem v procesu strojového učení. Zahrnuje čištění, transformaci a organizaci nezpracovaných dat, aby byla vhodná pro analýzu a modelování. V této ukázce kódu provedete několik kroků přípravy dat:
- Filtrováním datové sady odeberte odlehlé hodnoty a nesprávné hodnoty.
- Odebrání sloupců, které nejsou potřeba pro trénování modelu
- Vytvoření nových sloupců z nezpracovaných dat
- Vygenerování popisku pro určení, jestli daná cesta taxi zahrnuje tip
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
V dalším kroku proveďte druhé předání dat pro přidání konečných funkcí.
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Vytvoření logistického regresního modelu
Poslední úloha převede označená data do formátu, který může logistická regrese zpracovat. Vstup do logistického regresního algoritmu musí mít strukturu dvojic vektorů popisku/funkce, kde vektor funkce je vektor čísla, který představuje vstupní bod.
Na základě požadavků na konečný úkol musíme převést sloupce kategorií na čísla. Konkrétně musíme převést trafficTimeBins
sloupce na weekdayString
celočíselné reprezentace. Pro zpracování tohoto požadavku máme k dispozici řadu možností. Tento příklad zahrnuje OneHotEncoder
přístup:
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Výsledkem této akce je nový datový rámec se všemi sloupci ve správném formátu pro trénování modelu.
Trénování logistického regresního modelu
První úloha rozdělí datovou sadu na trénovací sadu a testovací nebo ověřovací sadu.
# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Jakmile máme dva datové rámce, musíme vytvořit vzorec modelu a spustit ho s trénovacím datovým rámcem. Pak můžeme ověřit testovací datový rámec. Experimentujte s různými verzemi vzorce modelu, abyste viděli efekty různých kombinací.
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
Výstupy buňky:
Area under ROC = 0.9749430523917996
Vytvoření vizuální reprezentace předpovědi
Teď můžeme vytvořit konečnou vizualizaci pro interpretaci výsledků modelu. Křivka ROC může jistě prezentovat výsledek.
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
Související obsah
- Použití ukázek AI k vytváření modelů strojového učení: Použití ukázek AI
- Sledování spuštění strojového učení pomocí experimentů: Experimenty strojového učení