Koneoppimismallin luominen Apache Spark MLlib:n avulla
Tässä artikkelissa kerrotaan, miten voit Apache Spark MLlibin avulla luoda koneoppimissovelluksen, joka käsittelee yksinkertaista ennakoivaa analyysia Azuren avoimesta tietojoukosta. Spark tarjoaa sisäisiä koneoppimiskirjastoja. Tässä esimerkissä luokitusta käytetään logistista regressiota käyttämällä.
SparkML- ja MLlib Spark -ydinkirjastot tarjoavat monia apuohjelmia, joista on hyötyä koneoppimistehtävissä. Nämä apuohjelmat soveltuvat seuraaviin:
- Luokitus
- Klusterointi
- Hypoteesitestaus ja mallitilastojen laskeminen
- Regressio
- SVD-hajotus ja pääosa-analyysi (PCA)
- Aiheen mallinnus
Tutustu luokitukseen ja logistiseen regressioon
Luokittelu, suosittu koneoppimistehtävä, sisältää syötetietojen lajittelemisen luokkiin. Luokitusalgoritmin tulisi selvittää, miten annetuille syötetiedille määritetään otsikoita . Esimerkiksi koneoppimisalgoritmi voisi hyväksyä varastotiedot syötteeksi ja jakaa osakkeet kahteen luokkaan: osakkeisiin, joita sinun pitäisi myydä, ja osakkeisiin, jotka sinun pitäisi säilyttää.
Logistinen regressioalgoritmi on hyödyllinen luokituksessa. Spark-logistinen regressio-ohjelmointirajapinta on hyödyllinen syötetietojen binaariluokittelussa yhteen kahdesta ryhmästä. Lisätietoja logistisesta regressiosta on Wikipediassa.
Logistinen regressio tuottaa logistisen funktion , joka voi ennustaa todennäköisyyden sille, että syötevektori kuuluu yhteen tai toiseen ryhmään.
Ennakoiva analyysi esimerkki New Yorkin kaupungin taksitiedoista
Asenna azureml-opendatasets
ensin . Tiedot ovat käytettävissä Azure Open Datasets -resurssin kautta. Tässä tietojoukon alijoukossa isännöidä tietoja keltaisista taksimatkoista, mukaan lukien alkamisajat, päättymisajat, alkamissijainnit, päättymissijainnit, matkakustannukset ja muut määritteet.
%pip install azureml-opendatasets
Artikkelin loppuosa käyttää Apache Sparkiä ensin tekemään analyysin NYC:n taksimatkan vinkkitiedoista ja kehittämään sitten mallin, joka ennustaa, sisältääkö tietty matka vinkin vai ei.
Luo Apache Spark -koneoppimismalli
Luo PySpark-muistikirja. Lisätietoja on kohdassa Muistikirjan luominen.
Tuo tähän muistikirjaan tarvittavat tyypit.
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluator
Käytämme MLflow-funktiota koneoppimiskokeilujen ja sitä vastaavien suoritusten seurantaan. Jos Microsoft Fabricin automaattinen loggaus on käytössä, vastaavat mittarit ja parametrit siepataan automaattisesti.
import mlflow
Muodosta syötteen DataFrame
Tämä esimerkki lataa tiedot Pandas-tietokehykseen ja muuntaa ne sitten Apache Spark -tietokehykseksi. Tässä muodossa voimme käyttää muita Apache Spark -toimintoja tietojoukon siistimiseksi ja suodattamiseksi.
Liitä nämä rivit uuteen soluun ja suorita ne Spark DataFrame -kehyksen luomiseksi. Tämä vaihe noutaa tiedot Avointen tietojoukkojen ohjelmointirajapinnan kautta. Voimme suodattaa nämä tiedot alaspäin ja tarkastella tiettyä tietoikkunaa. Koodiesimerkki käyttää
start_date
jaend_date
käyttää suodatinta, joka palauttaa yhden kuukauden tiedot.from azureml.opendatasets import NycTlcYellow end_date = parser.parse('2018-06-06') start_date = parser.parse('2018-05-01') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) nyc_tlc_pd = nyc_tlc.to_pandas_dataframe() nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
Tämä koodi pienentää tietojoukon noin 10 000 riviin. Jos haluat nopeuttaa kehitystä ja koulutusta, koodi ottaa näytteitä tietojoukostamme toistaiseksi.
# To make development easier, faster, and less expensive, sample down for now sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
Haluamme tarkastella tietojamme käyttämällä sisäistä komentoa
display()
. Tämän komennon avulla voimme helposti tarkastella tietomallia tai tutkia tietojen trendejä graafisesti.#sampled_taxi_df.show(10) display(sampled_taxi_df.limit(10))
Tietojen valmistelu
Tietojen valmistelu on tärkeä vaihe koneoppimisprosessissa. Se sisältää siistimisen, muuntamisen ja raakadatan järjestämisen, jotta se sopii analysointiin ja mallinnukseen. Tässä koodimallissa suoritat useita tietojen valmistelun vaiheita:
- Poista poikkeavat arvot ja virheelliset arvot suodattamalla tietojoukko
- Poista sarakkeet, joita ei tarvita mallin harjoittamisessa
- Luo uusia sarakkeita raakatiedoista
- Luo tunniste sen määrittämiseksi, liittyykö annettuun taksimatkaan vinkki
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Lisää sitten lopulliset ominaisuudet lisäämällä tietojen toinen välitys.
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Logistisen regressiomallin luominen
Viimeisessä tehtävässä otsikoidut tiedot muunnetaan muotoon, jota logistinen regressio pystyy käsittelemään. Logistisen regressioalgoritmin syötteellä on oltava otsikko-/ominaisuusvektoriparien rakenne, jossa ominaisuusvektori on syötepistettä edustavien lukujen vektori.
Lopullisten tehtävän vaatimusten perusteella luokittaiset sarakkeet on muunnettava luvuiksi. Ja -sarakkeet on muunnettava trafficTimeBins
weekdayString
kokonaislukumuotoiseksi esitykseksi. Meillä on käytettävissämme monia vaihtoehtoja tämän vaatimuksen käsittelemiseksi. Tässä esimerkissä käsitellään seuraavaa OneHotEncoder
lähestymistapaa:
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Tämän toiminnon tuloksena saadaan uusi DataFrame, jossa kaikki sarakkeet ovat oikeassa muodossa mallin harjoittamiseksi.
Logistisen regressiomallin harjoittaminen
Ensimmäisessä tehtävässä tietojoukko jaetaan harjoitusjoukkoon ja testaus- tai vahvistusjoukkoon.
# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Kun käytössä on kaksi DataFrame-kehystä, meidän täytyy luoda mallikaava ja suorittaa se harjoittamisen DataFrame-kehyksessä. Sitten voimme vahvistaa testin dataFrame-kehyksen avulla. Kokeile mallikaavan eri versioita, jotta näet eri yhdistelmien vaikutukset.
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
Solun tuloksena on:
Area under ROC = 0.9749430523917996
Visuaalisen esityksen luominen ennusteesta
Voimme nyt luoda lopullisen visualisoinnin mallin tulosten tulkitsemiseksi. ROC-käyrä voi varmasti esittää tuloksen.
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
Liittyvä sisältö
- Tekoälynäytteiden käyttö koneoppimismallien luomiseen: Tekoälynäytteiden käyttäminen
- Koneoppimisten suoritusten seuraaminen kokeiden avulla: koneoppimiskokeilut