Condividi tramite


Esercitazione: creare un'app di Machine Learning con MLlib di Apache Spark e Azure Synapse Analytics

Questo articolo illustra come usare MLlib di Apache Spark per creare un'applicazione di Machine Learning che esegue una semplice analisi predittiva su un set di dati aperto di Azure. Spark offre librerie di apprendimento automatico predefinite. Questo esempio usa la classificazione tramite regressione logistica.

SparkML e MLlib sono librerie Spark di base che offrono diverse utilità in grado di agevolare le attività di apprendimento automatico, incluse utilità adatte a:

  • Classificazione
  • Regressione
  • Cluster
  • Modellazione di argomenti
  • Scomposizione di valori singolari e analisi in componenti principali
  • Testing e calcolo ipotetici di statistiche di esempio

Informazioni sulla classificazione e la regressione logistica

La classificazione, un'attività comune di apprendimento automatico, è il processo di ordinamento dei dati in categorie. È il processo eseguito da un algoritmo di classificazione per determinare come assegnare etichette ai dati di input specificati. Si pensi, ad esempio, a un algoritmo di apprendimento automatico che accetta informazioni sulle azioni come input e divide le azioni in due categorie: azioni da vendere e azioni da conservare.

La regressione logistica è un algoritmo usato per la classificazione. L'API di regressione logistica di Spark è utile per la classificazione binaria, ovvero la classificazione di dati di input in uno di due gruppi. Per altre informazioni sulla regressione logistica, vedere Wikipedia.

Riepilogando, il processo di regressione logistica genera una funzione logistica che è possibile usare per stimare la probabilità che un vettore di input appartenga a un gruppo o all'altro.

Esempio di analisi predittiva di dati relativi alla rete taxi di New York

In questo esempio, viene usato Spark per eseguire un'analisi predittiva dei dati relativi alle mance sui taxi di New York. I dati sono disponibili tramite set di dati aperti di Azure. Questo subset del set di dati contiene informazioni sulle corse in taxi, incluse informazioni su ogni corsa, l'ora di partenza e di arrivo, i percorsi, i costi e altri attributi interessanti.

Importante

Per il pull dei dati dalla posizione di archiviazione potrebbero essere addebitati costi aggiuntivi.

Nei passaggi seguenti viene sviluppato un modello per stimare se una particolare corsa includa una mancia o meno.

Creare un modello di Machine Learning con Apache Spark

  1. Creare un notebook usando il kernel PySpark. Per le istruzioni, vedere Creare un notebook.

  2. Importare i tipi richiesti per l'applicazione. Copiare e incollare il codice seguente in una cella vuota e quindi premere Maiusc+Invio. In alternativa, eseguire la cella usando l'icona di riproduzione blu a sinistra del codice.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    

    Dato che è stato usato il kernel PySpark, non è necessario creare contesti in modo esplicito. Il contesto Spark viene creata automaticamente quando si esegue la prima cella di codice.

Creare il DataFrame di input

Poiché i dati non elaborati sono in formato Parquet, è possibile usare il contesto di Spark per eseguire direttamente il pull del file in memoria come DataFrame. Anche se nei passaggi seguenti il codice usa le opzioni predefinite, se necessario è possibile forzare il mapping dei tipi di dati e di altri attributi dello schema.

  1. Eseguire le righe seguenti per creare un DataFrame di Spark incollando il codice in una nuova cella. Questo passaggio consente di recuperare dati tramite l'API dei set di dati aperti di Azure. Il pull di tutti questi dati genera circa 1,5 miliardi righe.

    A seconda delle dimensioni del pool di Apache Spark serverless, i dati non elaborati potrebbero essere troppo grandi o richiedere troppo tempo per poter essere usati. È possibile filtrare i dati fino a ottenere un valore inferiore. L'esempio di codice seguente usa start_date e end_date per applicare un filtro che restituisce un singolo mese di dati.

    from azureml.opendatasets import NycTlcYellow
    
    from datetime import datetime
    from dateutil import parser
    
    end_date = parser.parse('2018-05-08 00:00:00')
    start_date = parser.parse('2018-05-01 00:00:00')
    
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    filtered_df = spark.createDataFrame(nyc_tlc.to_pandas_dataframe())
    
    
  2. Lo svantaggio della semplice applicazione di un filtro è che, da una prospettiva statistica, potrebbe introdurre distorsioni nei dati. Un altro approccio consiste nell'usare il campionamento incorporato in Spark.

    Il codice seguente riduce il set di dati fino a circa 2,000 righe, se applicato dopo il codice precedente. È possibile usare questo passaggio di campionamento al posto del semplice filtro o assieme ad esso.

    # To make development easier, faster, and less expensive, downsample for now
    sampled_taxi_df = filtered_df.sample(True, 0.001, seed=1234)
    
  3. È ora possibile esaminare i dati per vedere cosa è stato letto. In genere è preferibile rivedere i dati con un subset anziché il set completo, a seconda delle dimensioni del set di dati.

    Il codice seguente offre due modi per visualizzare i dati. Il primo modo è di base. Il secondo modo offre un'esperienza di griglia molto più completa, insieme alla possibilità di visualizzare i dati graficamente.

    #sampled_taxi_df.show(5)
    display(sampled_taxi_df)
    
  4. A seconda delle dimensioni del set di dati generato e della necessità di sperimentare o eseguire il notebook più volte, si può desiderare di memorizzare nella cache il set di dati in locale nell'area di lavoro. Esistono tre modi per eseguire la memorizzazione esplicita nella cache:

    • Salvare il DataFrame in locale come file.
    • Salvare il DataFrame come tabella o vista temporanea.
    • Salvare il DataFrame come tabella permanente.

Negli esempi di codice seguenti sono inclusi i primi due approcci.

La creazione di una vista o di una tabella temporanea offre percorsi di accesso diversi ai dati, ma è disponibile solo per la durata della sessione dell'istanza di Spark.

sampled_taxi_df.createOrReplaceTempView("nytaxi")

Preparare i dati

I dati nel relativo formato non elaborato spesso non sono adatti per il passaggio diretto a un modello. È necessario eseguire una serie di azioni sui dati per ottenerli in uno stato in cui il modello possa usarli.

Nel codice seguente si eseguono quattro classi di operazioni:

  • Rimozione dei valori outlier o non corretti tramite filtro.
  • Rimozione delle colonne non necessarie.
  • La creazione di nuove colonne derivate dai dati non elaborati per far funzionare il modello in modo più efficace. Questa operazione viene talvolta definita definizione delle funzionalità.
  • Assegnazione di etichette. Dato che si sta eseguendo la classificazione binaria (una determinata corsa prevederà o meno una mancia?), è necessario convertire l'importo della mancia in un valore 0 o 1.
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                                , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                                , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                                , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                                , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                                , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                                )\
                        .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                                & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                                & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                                & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                                & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                                & (sampled_taxi_df.rateCodeId <= 5)
                                & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                                )

Viene quindi eseguito un secondo passaggio sui dati per aggiungere le funzionalità finali.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Creare un modello di regressione logistica

L'ultima attività è la conversione dei dati con etichetta in un formato che possa essere analizzato tramite la regressione logistica. L'input per un algoritmo di regressione logistica deve essere un set di coppie etichetta-vettore di funzionalità, dove il vettore di funzionalità è un vettore di numeri che rappresenta il punto di ingresso.

Quindi, è necessario convertire le colonne categoriche in numeri. In particolare, è necessario convertire le colonne trafficTimeBins e weekdayString in rappresentazioni integer. Esistono diversi approcci all’esecuzione della conversione. Nell'esempio seguente viene usato l'approccio OneHotEncoder, che è diffuso.

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Questa azione genera un nuovo DataFrame con tutte le colonne nel formato appropriato per eseguire il training di un modello.

Eseguire il training di un modello di regressione logistica

La prima attività consiste nel suddividere il set di dati in un set di training e in un set di test o di convalida. La divisione qui è arbitraria. Sperimentare diverse impostazioni di suddivisione per verificare se influiscono sul modello.

# Decide on the split between training and testing data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Ora che sono presenti due DataFrame, l'attività successiva consiste nel creare la formula del modello ed eseguirla rispetto al DataFrame di training. È quindi possibile eseguire la convalida rispetto al DataFrame di test. Provare versioni diverse della formula del modello per vedere l'effetto di combinazioni diverse.

Nota

Per salvare il modello, assegnare il ruolo Collaboratore ai dati del BLOB di archiviazione all'ambito della risorsa del server di database SQL di Azure. Per la procedura dettagliata, vedere Assegnare ruoli di Azure usando il portale di Azure. Solo i membri con i privilegi di proprietario possono eseguire questo passaggio.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Saving the model is optional, but it's another form of inter-session cache
datestamp = datetime.now().strftime('%m-%d-%Y-%s')
fileName = "lrModel_" + datestamp
logRegDirfilename = fileName
lrModel.save(logRegDirfilename)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

L'output di questa cella è:

Area under ROC = 0.9779470729751403

Creare una rappresentazione visiva della stima

È ora possibile creare una visualizzazione finale per comprendere meglio i risultati di questo test. Uno dei modi per rivedere i risultati è attraverso una curva ROC.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graph that shows the ROC curve for logistic regression in the tip model.

Arrestare l'istanza di Spark

Al termine dell'esecuzione dell'applicazione, arrestare il notebook per rilasciare le risorse attraverso la chiusura della scheda. In alternativa, selezionare Termina sessione sul riquadro di stato nella parte inferiore del notebook.

Vedi anche

Passaggi successivi

Nota

Parte della documentazione ufficiale di Apache Spark si basa sull'uso della console Spark, che non è disponibile in Apache Spark di Azure Synapse Analytics. Usare al suo posto un notebook o IntelliJ.