Udostępnij za pośrednictwem


Tworzenie modelu uczenia maszynowego za pomocą biblioteki MLlib platformy Apache Spark

Z tego artykułu dowiesz się, jak za pomocą biblioteki MLlib platformy Apache Spark utworzyć aplikację uczenia maszynowego, która obsługuje prostą analizę predykcyjną na otwartym zestawie danych platformy Azure. Platforma Spark udostępnia wbudowane biblioteki uczenia maszynowego. W tym przykładzie użyto klasyfikacji za pośrednictwem regresji logistycznej.

Podstawowe biblioteki SparkML i MLlib Spark udostępniają wiele narzędzi, które są przydatne w przypadku zadań uczenia maszynowego. Te narzędzia są odpowiednie dla:

  • Klasyfikacja
  • Klastrowanie
  • Testowanie hipotez i obliczanie przykładowych statystyk
  • Regresja
  • Dekompozycja wartości pojedynczej (SVD) i analiza głównych składników (PCA)
  • Modelowanie tematów

Omówienie klasyfikacji i regresji logistycznej

Klasyfikacja, popularne zadanie uczenia maszynowego, obejmuje sortowanie danych wejściowych w kategorie. Algorytm klasyfikacji powinien ustalić, jak przypisać etykiety do podanych danych wejściowych. Na przykład algorytm uczenia maszynowego może akceptować informacje o zapasach jako dane wejściowe i podzielić akcje na dwie kategorie: akcje, które należy sprzedawać i akcje, które należy zachować.

Algorytm regresji logistycznej jest przydatny do klasyfikacji. Interfejs API regresji logistycznej platformy Spark jest przydatny w przypadku klasyfikacji binarnej danych wejściowych w jedną z dwóch grup. Aby uzyskać więcej informacji na temat regresji logistycznej, zobacz Wikipedia.

Regresja logistyczna tworzy funkcję logistyczną, która może przewidywać prawdopodobieństwo, że wektor wejściowy należy do jednej grupy lub drugiej.

Przykład analizy predykcyjnej danych taksówek w Nowym Jorku

Najpierw zainstaluj program azureml-opendatasets. Dane są dostępne za pośrednictwem zasobu Azure Open Datasets . Ten podzestaw zestawu danych zawiera informacje o żółtych przejazdach taksówek, w tym czasy rozpoczęcia, godziny zakończenia, lokalizacje początkowe, lokalizacje końcowe, koszty podróży i inne atrybuty.

%pip install azureml-opendatasets

Pozostała część tego artykułu opiera się na platformie Apache Spark, aby najpierw przeprowadzić analizę danych porad dotyczących taksówek w Nowym Jorku, a następnie opracować model, aby przewidzieć, czy określona podróż zawiera poradę, czy nie.

Tworzenie modelu uczenia maszynowego platformy Apache Spark

  1. Utwórz notes PySpark. Aby uzyskać więcej informacji, odwiedź stronę Tworzenie notesu.

  2. Zaimportuj typy wymagane dla tego notesu.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Użyjemy platformy MLflow do śledzenia eksperymentów uczenia maszynowego i odpowiednich przebiegów. Jeśli automatyczne rejestrowanie w usłudze Microsoft Fabric jest włączone, odpowiednie metryki i parametry są automatycznie przechwytywane.

    import mlflow
    

Konstruowanie wejściowej ramki danych

W tym przykładzie dane są ładowane do ramki danych biblioteki Pandas, a następnie konwertuje je na ramkę danych platformy Apache Spark. W tym formacie możemy zastosować inne operacje platformy Apache Spark do czyszczenia i filtrowania zestawu danych.

  1. Wklej te wiersze do nowej komórki i uruchom je, aby utworzyć ramkę danych platformy Spark. Ten krok pobiera dane za pośrednictwem interfejsu API Otwórz zestawy danych. Możemy filtrować te dane w dół, aby zbadać określone okno danych. W przykładzie kodu użyto start_date metody i end_date zastosowano filtr, który zwraca jeden miesiąc danych.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Ten kod zmniejsza zestaw danych do około 10 000 wierszy. Aby przyspieszyć programowanie i szkolenie, na razie przykłady kodu w dół naszego zestawu danych.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Chcemy przyjrzeć się naszym danym za pomocą wbudowanego display() polecenia. Za pomocą tego polecenia możemy łatwo wyświetlić przykład danych lub graficznie eksplorować trendy w danych.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Przygotowywanie danych

Przygotowywanie danych to kluczowy krok w procesie uczenia maszynowego. Obejmuje to czyszczenie, przekształcanie i organizację danych pierwotnych w celu przygotowania ich do analizy i modelowania. W tym przykładzie kodu wykonasz kilka kroków przygotowywania danych:

  • Filtrowanie zestawu danych w celu usunięcia wartości odstających i nieprawidłowych
  • Usuwanie kolumn, które nie są potrzebne do trenowania modelu
  • Tworzenie nowych kolumn na podstawie danych pierwotnych
  • Generowanie etykiety w celu określenia, czy dana podróż taksówką obejmuje poradę
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Następnie utwórz drugie przekazanie danych, aby dodać końcowe funkcje.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Tworzenie modelu regresji logistycznej

Ostatnie zadanie konwertuje oznaczone dane na format, który może obsługiwać regresja logistyczna. Dane wejściowe algorytmu regresji logistycznej muszą mieć strukturę par wektorów etykiet/cech, gdzie wektor funkcji jest wektorem liczb reprezentujących punkt wejściowy.

Na podstawie końcowych wymagań dotyczących zadań musimy przekonwertować kolumny kategorii na liczby. W szczególności musimy przekonwertować kolumny trafficTimeBins i weekdayString na reprezentacje całkowite. Dostępnych jest wiele opcji obsługi tego wymagania. Ten przykład obejmuje OneHotEncoder podejście:

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Ta akcja powoduje utworzenie nowej ramki danych ze wszystkimi kolumnami w odpowiednim formacie w celu wytrenowania modelu.

Trenowanie modelu regresji logistycznej

Pierwsze zadanie dzieli zestaw danych na zestaw szkoleniowy oraz zestaw testów lub walidacji.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Gdy mamy dwie ramki danych, musimy utworzyć formułę modelu i uruchomić ją względem ramki danych trenowania. Następnie możemy sprawdzić poprawność względem testowej ramki danych. Poeksperymentuj z różnymi wersjami formuły modelu, aby zobaczyć efekty różnych kombinacji.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Dane wyjściowe komórki:

Area under ROC = 0.9749430523917996

Tworzenie wizualnej reprezentacji przewidywania

Teraz możemy utworzyć ostateczną wizualizację w celu zinterpretowania wyników modelu. Krzywa ROC może z pewnością przedstawić wynik.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Wykres przedstawiający krzywą ROC dla regresji logistycznej w modelu porad.