Κοινή χρήση μέσω


Δημιουργία μοντέλου εκμάθησης μηχανής με το Apache Spark MLlib

Σε αυτό το άρθρο, θα μάθετε πώς μπορείτε να χρησιμοποιήσετε το Apache Spark MLlib για να δημιουργήσετε μια εφαρμογή εκμάθησης μηχανής που χειρίζεται μια απλή προγνωστική ανάλυση σε ένα ανοικτό σύνολο δεδομένων Azure. Το Spark παρέχει ενσωματωμένες βιβλιοθήκες εκμάθησης μηχανής. Αυτό το παράδειγμα χρησιμοποιεί ταξινόμηση μέσω λογιστικής παλινδρόμησης.

Οι βασικές βιβλιοθήκες SparkML και MLlib Spark παρέχουν πολλά βοηθητικά προγράμματα που είναι χρήσιμα για εργασίες εκμάθησης μηχανής. Αυτά τα βοηθητικά προγράμματα είναι κατάλληλα για:

  • Ταξινόμηση
  • Συμπλέγματος
  • Δοκιμή υποθέσεων και υπολογισμός δειγμάτων στατιστικών στοιχείων
  • Οπισθοδρόμηση
  • Αποδόμηση μοναδικής τιμής (SVD) και ανάλυση κύριων στοιχείων (PCA)
  • Μοντελοποίηση θέματος

Κατανόηση της ταξινόμησης και της λογιστικής παλινδρόμησης

Η ταξινόμηση, μια δημοφιλής εργασία εκμάθησης μηχανής, περιλαμβάνει την ταξινόμηση δεδομένων εισόδου σε κατηγορίες. Ένας αλγόριθμος ταξινόμησης θα πρέπει να καταλάβει πώς μπορείτε να αντιστοιχίσετε ετικέτες στα παρεχόμενα δεδομένα εισόδου. Για παράδειγμα, ένας αλγόριθμος εκμάθησης μηχανής θα μπορούσε να δεχτεί πληροφορίες μετοχών ως δεδομένα εισόδου και να διαιρέσει το απόθεμα σε δύο κατηγορίες: αποθέματα που θα πρέπει να πουλήσετε και αποθέματα που θα πρέπει να διατηρήσετε.

Ο αλγόριθμος λογιστικής παλινδρόμησης είναι χρήσιμος για την ταξινόμηση. Το API λογιστικής παλινδρόμησης Spark είναι χρήσιμο για τη δυαδική ταξινόμηση δεδομένων εισόδου σε μία από δύο ομάδες. Για περισσότερες πληροφορίες σχετικά με την λογιστική παλινδρόμηση, ανατρέξτε στην Wikipedia.

Η λογιστική παλινδρόμηση παράγει μια λογιστική συνάρτηση που μπορεί να προβλέψει την πιθανότητα ότι ένα διάνυσμα εισόδου ανήκει στη μία ή την άλλη ομάδα.

Παράδειγμα προγνωστικής ανάλυσης δεδομένων ταξί NYC

Πρώτα, εγκαταστήστε azureml-opendatasetsτο . Τα δεδομένα είναι διαθέσιμα μέσω του πόρου Azure Open Datasets . Αυτό το υποσύνολο δεδομένων φιλοξενεί πληροφορίες σχετικά με διαδρομές με κίτρινα ταξί, όπως τις ώρες έναρξης, τις ώρες λήξης, τις τοποθεσίες έναρξης, τις τελικές τοποθεσίες, το κόστος διαδρομής και άλλα χαρακτηριστικά.

%pip install azureml-opendatasets

Το υπόλοιπο αυτού του άρθρου βασίζεται στο Apache Spark για να εκτελέσει πρώτα κάποια ανάλυση σχετικά με τα δεδομένα συμβουλής ταξί της Νέας Υόρκης και, στη συνέχεια, να αναπτύξει ένα μοντέλο για να προβλέψει εάν ένα συγκεκριμένο ταξίδι περιλαμβάνει μια συμβουλή ή όχι.

Δημιουργία μοντέλου εκμάθησης μηχανής Apache Spark

  1. Δημιουργήστε ένα σημειωματάριο PySpark. Για περισσότερες πληροφορίες, ανατρέξτε στην ενότητα Δημιουργία σημειωματάριου.

  2. Εισαγάγετε τους τύπους που απαιτούνται για αυτό το σημειωματάριο.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Θα χρησιμοποιήσουμε το MLflow για να παρακολουθούμε τα πειράματα εκμάθησης μηχανής και τις αντίστοιχες εκτελέσεις. Εάν είναι ενεργοποιημένη η αυτόματη καταγραφή microsoft Fabric, τα αντίστοιχα μετρικά και παράμετροι καταγράφονται αυτόματα.

    import mlflow
    

Κατασκευή του dataFrame εισόδου

Αυτό το παράδειγμα φορτώνει τα δεδομένα σε ένα πλαίσιο δεδομένων Pandas και, στη συνέχεια, τα μετατρέπει σε ένα πλαίσιο δεδομένων Apache Spark. Σε αυτή τη μορφή, μπορούμε να εφαρμόσουμε άλλες λειτουργίες Apache Spark για εκκαθάριση και φιλτράρισμα του συνόλου δεδομένων.

  1. Επικολλήστε αυτές τις γραμμές σε ένα νέο κελί και εκτελέστε τις για να δημιουργήσετε ένα Spark DataFrame. Αυτό το βήμα ανακτά τα δεδομένα μέσω του API Ανοικτών συνόλων δεδομένων. Μπορούμε να φιλτράρουμε αυτά τα δεδομένα για να εξετάσουμε ένα συγκεκριμένο παράθυρο δεδομένων. Το παράδειγμα start_date κώδικα χρησιμοποιεί και end_date για να εφαρμόσει ένα φίλτρο που επιστρέφει έναν μήνα δεδομένων.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Αυτός ο κώδικας μειώνει το σύνολο δεδομένων σε περίπου 10.000 γραμμές. Για να επιταχύνουμε την ανάπτυξη και την εκπαίδευση, τα δείγματα κώδικα με βάση το σύνολο δεδομένων μας προς το παρόν.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Θέλουμε να εξετάσουμε τα δεδομένα μας χρησιμοποιώντας την ενσωματωμένη display() εντολή. Με αυτήν την εντολή, μπορούμε εύκολα να προβάλουμε ένα δείγμα δεδομένων ή να εξερευνήσουμε γραφικά τις τάσεις στα δεδομένα.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Προετοιμασία των δεδομένων

Η προετοιμασία δεδομένων είναι ένα κρίσιμο βήμα στη διαδικασία εκμάθησης μηχανής. Περιλαμβάνει την εκκαθάριση, τον μετασχηματισμό και την οργάνωση ανεπεξέργαστων δεδομένων, προκειμένου να καταστούν κατάλληλα για ανάλυση και μοντελοποίηση. Σε αυτό το δείγμα κώδικα, εκτελείτε διάφορα βήματα προετοιμασίας δεδομένων:

  • Φιλτράρισμα του συνόλου δεδομένων για την κατάργηση έκτοπων και λανθασμένων τιμών
  • Κατάργηση στηλών που δεν είναι απαραίτητες για την εκπαίδευση μοντέλου
  • Δημιουργία νέων στηλών από τα ανεπεξέργαστα δεδομένα
  • Δημιουργήστε μια ετικέτα για να προσδιορίσετε αν μια συγκεκριμένη διαδρομή με ταξί περιλαμβάνει μια συμβουλή
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Στη συνέχεια, κάντε μια δεύτερη μεταβίβαση των δεδομένων για να προσθέσετε τις τελικές δυνατότητες.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Δημιουργία μοντέλου λογιστικής παλινδρόμησης

Η τελική εργασία μετατρέπει τα δεδομένα με ετικέτες σε μια μορφή που μπορεί να χειριστεί η λογιστική παλινδρόμηση. Η είσοδος σε έναν αλγόριθμο λογιστικής παλινδρόμησης πρέπει να έχει μια δομή διανυσματικών ζευγών ετικέτας/δυνατότητας, όπου το διάνυσμα δυνατότητας είναι ένα διάνυσμα αριθμών που αντιπροσωπεύουν το σημείο εισόδου.

Με βάση τις απαιτήσεις τελικής εργασίας, πρέπει να μετατρέψουμε τις κατηγορικές στήλες σε αριθμούς. Συγκεκριμένα, πρέπει να μετατρέψουμε τις trafficTimeBins στήλες και weekdayString σε ακέραιες αναπαραστάσεις. Έχουμε πολλές διαθέσιμες επιλογές για τον χειρισμό αυτής της απαίτησης. Αυτό το παράδειγμα περιλαμβάνει την OneHotEncoder προσέγγιση:

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Αυτή η ενέργεια έχει ως αποτέλεσμα ένα νέο DataFrame με όλες τις στήλες στην κατάλληλη μορφή για την εκπαίδευση ενός μοντέλου.

Εκπαίδευση μοντέλου λογιστικής παλινδρόμησης

Η πρώτη εργασία διαιρεί το σύνολο δεδομένων σε ένα σύνολο εκπαίδευσης και ένα σύνολο δοκιμών ή επικύρωσης.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Όταν έχουμε δύο DataFrame, πρέπει να δημιουργήσουμε τον τύπο μοντέλου και να τον εκτελέσουμε στο dataFrame εκπαίδευσης. Στη συνέχεια, μπορούμε να κάνουμε επικύρωση σε σχέση με το dataFrame δοκιμής. Πειραματιστείτε με διαφορετικές εκδόσεις του τύπου μοντέλου για να δείτε τα αποτελέσματα διαφορετικών συνδυασμών.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Το κελί εξάγει:

Area under ROC = 0.9749430523917996

Δημιουργία οπτικής αναπαράστασης της πρόβλεψης

Τώρα, μπορούμε να δημιουργήσουμε μια τελική απεικόνιση για να ερμηνεύσουμε τα αποτελέσματα του μοντέλου. Μια καμπύλη ROC μπορεί σίγουρα να παρουσιάσει το αποτέλεσμα.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Γράφημα που εμφανίζει την καμπύλη ROC για λογιστική παλινδρόμηση στο μοντέλο συμβουλών.