Поделиться через


Создание модели машинного обучения с помощью Apache Spark MLlib

Из этой статьи вы узнаете, как использовать Apache Spark MLlib для создания приложения машинного обучения, обрабатывающего простой прогнозный анализ в открытом наборе данных Azure. Apache Spark содержит встроенные библиотеки машинного обучения. В этом примере используется классификация с помощью логистической регрессии.

Основные библиотеки SparkML и MLlib Spark предоставляют множество служебных программ, полезных для задач машинного обучения. Эти служебные программы подходят для следующих целей:

  • Классификация
  • Кластеризация
  • проверки гипотез и статистической выборки.
  • Регрессия
  • сингулярного разложения и анализа по методу главных компонент;
  • тематического моделирования;

Общие сведения о классификации и логистической регрессии

Классификация, популярная задача машинного обучения, включает сортировку входных данных в категории. Алгоритм классификации должен выяснить, как назначать метки предоставленным входным данным. Например, алгоритм машинного обучения может принимать данные акций в качестве входных данных и разделить акции на две категории: акции, которые следует продавать и акции, которые следует хранить.

Алгоритм логистической регрессии полезен для классификации. API логистической регрессии Spark полезен для двоичной классификации входных данных в одну из двух групп. Дополнительные сведения о логистической регрессии см. на соответствующей вики-странице (на английском языке).

Логистическая регрессия создает логистическую функцию , которая может предсказать вероятность того, что входной вектор принадлежит одной группе или другой.

Пример прогнозного анализа данных такси Нью-Йорка

Сначала установите azureml-opendatasets. Данные доступны через ресурс Azure Open Datasets . В этом подмножестве набора данных содержатся сведения о желтых поездках на такси, включая время начала, время окончания, места начала, места окончания, затраты на поездки и другие атрибуты.

%pip install azureml-opendatasets

Остальная часть этой статьи основана на Apache Spark, чтобы сначала выполнить некоторый анализ на основе данных советов о поездке в такси Нью-Йорка, а затем разработать модель для прогнозирования того, включает ли конкретная поездка совет или нет.

Создание модели машинного обучения Apache Spark

  1. Создайте записную книжку PySpark. Дополнительные сведения см. в статье "Создание записной книжки".

  2. Импортируйте типы, необходимые для этой записной книжки.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Мы будем использовать MLflow для отслеживания экспериментов машинного обучения и соответствующих запусков. Если включена автоматическая запись Microsoft Fabric, соответствующие метрики и параметры автоматически записываются.

    import mlflow
    

Создание входного кадра данных

В этом примере данные загружаются в кадр данных Pandas, а затем преобразуются в кадр данных Apache Spark. В этом формате можно применить другие операции Apache Spark для очистки и фильтрации набора данных.

  1. Вставьте эти строки в новую ячейку и запустите их для создания кадра данных Spark. При этом данные извлекаются через API Открытых наборов данных. Мы можем отфильтровать эти данные, чтобы изучить определенное окно данных. В примере кода используется и end_date применяется start_date фильтр, возвращающий один месяц данных.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Этот код сокращает набор данных примерно до 10 000 строк. Чтобы ускорить разработку и обучение, примеры кода теперь вниз по нашему набору данных.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Мы хотим просмотреть наши данные с помощью встроенной display() команды. С помощью этой команды можно легко просмотреть пример данных или графически изучить тенденции в данных.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Подготовка данных

Подготовка данных является важным шагом в процессе машинного обучения. Он включает очистку, преобразование и организацию необработанных данных, чтобы сделать его подходящим для анализа и моделирования. В этом примере кода выполняется несколько действий по подготовке данных:

  • Фильтрация набора данных, чтобы удалить выбросы и неверные значения
  • Удаление столбцов, которые не нужны для обучения модели
  • Создание новых столбцов из необработанных данных
  • Создание метки для определения того, включает ли данная поездка на такси чаевые
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Затем добавьте вторую передачу данных, чтобы добавить окончательные функции.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Создание модели машинного обучения с помощью R

Последняя задача преобразует помеченные данные в формат, который может обрабатывать логистическую регрессию. Входные данные алгоритма логистической регрессии должны иметь структуру пар векторов меток и признаков, где вектор признаков является вектором чисел, представляющих входную точку.

В соответствии с окончательными требованиями задачи необходимо преобразовать категориальные столбцы в числа. В частности, необходимо преобразовать trafficTimeBins столбцы weekdayString в целочисленные представления. У нас есть множество вариантов для обработки этого требования. В этом примере используется OneHotEncoder подход:

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Это действие приводит к новому кадру данных со всеми столбцами в правильном формате для обучения модели.

Обучение модели логистической регрессии

Первая задача разбивает набор данных на обучающий набор, а также тестовый или проверяющий набор.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

После того как у нас есть два кадра данных, необходимо создать формулу модели и запустить ее в обучаемом кадре данных. Затем мы можем проверить наличие тестового кадра данных. Поэкспериментируйте с различными версиями формулы модели, чтобы увидеть эффекты различных сочетаний.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Выходные данные ячейки:

Area under ROC = 0.9749430523917996

Создание визуального представления прогноза

Теперь можно создать окончательную визуализацию для интерпретации результатов модели. Кривая ROC может, безусловно, представить результат.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Граф, отображающий кривую ROC для логистической регрессии в модели чаевых.