Руководство: Создание, обучение и оценка модели прироста
В этом руководстве представлен полный пример рабочего процесса Обработки и анализа данных Synapse в Microsoft Fabric. Вы узнаете, как создавать, обучать и оценивать модели подъема и применять методы моделирования подъема.
Необходимые условия
Получите подписку Microsoft Fabric. Или запишитесь на бесплатную пробную версию Microsoft Fabric .
Войдите в Microsoft Fabric.
Используйте переключатель интерфейса в левой нижней части домашней страницы, чтобы перейти на Fabric.
- Знакомство с блокнотами Microsoft Fabric
- Lakehouse для этой записной книжки, чтобы хранить данные в данном примере. Для получения дополнительной информации посетите Добавьте «lakehouse» в записную книжку
Следуйте инструкциям в записной книжке
Вы можете следить в блокноте одним из двух способов:
- Откройте и запустите встроенную записную книжку.
- Отправьте записную книжку из GitHub.
Открытие встроенной записной книжки
Тетрадь моделирования Uplift идет вместе с этим руководством.
Чтобы открыть образец ноутбука для этого руководства, следуйте инструкциям в Подготовьте свою систему для учебных материалов по обработке данных.
Перед началом выполнения кода обязательно подключить lakehouse к ноутбуку.
Импорт записной книжки из GitHub
AIsample — Uplift Modeling.ipynb записная книжка сопровождает это руководство.
Чтобы открыть сопровождающую записную книжку для этого руководства, следуйте инструкциям в Подготовка системы для учебников по обработке и анализу данных, чтобы импортировать записную книжку в вашу рабочую область.
Вы можете создать новый блокнот, если вы предпочитаете скопировать и вставить код с этой страницы.
Не забудьте подключить lakehouse к ноутбуку перед выполнением кода.
Шаг 1. Загрузка данных
Набор данных
Criteo AI Lab создал набор данных. Этот набор данных содержит 13M строк. Каждая строка представляет одного пользователя. Каждая строка имеет 12 функций, индикатор лечения и две двоичные метки, которые включают посещение и преобразование.
- f0 - f11: значения признаков (плотные, плавающие значения)
- участия: был ли пользователь случайным образом выбран для участия (например, реклама) (1 = участие, 0 = контрольная группа)
- преобразования: произошло ли преобразование (например, сделал покупку) у пользователя (двоичная метка)
- визит: произошло ли преобразование (например, совершена покупка) для пользователя (бинарный, метка)
Цитата
- Домашняя страница набора данных: https://ailab.criteo.com/criteo-uplift-prediction-dataset/
Для набора данных, используемого для этой записной книжки, требуется ссылка BibTex:
@inproceedings{Diemert2018,
author = {{Diemert Eustache, Betlei Artem} and Renaudin, Christophe and Massih-Reza, Amini},
title={A Large Scale Benchmark for Uplift Modeling},
publisher = {ACM},
booktitle = {Proceedings of the AdKDD and TargetAd Workshop, KDD, London,United Kingdom, August, 20, 2018},
year = {2018}
}
Совет
Определив следующие параметры, эту записную книжку можно легко применить к разным наборам данных.
IS_CUSTOM_DATA = False # If True, the user must upload the dataset manually
DATA_FOLDER = "Files/uplift-modelling"
DATA_FILE = "criteo-research-uplift-v2.1.csv"
# Data schema
FEATURE_COLUMNS = [f"f{i}" for i in range(12)]
TREATMENT_COLUMN = "treatment"
LABEL_COLUMN = "visit"
EXPERIMENT_NAME = "aisample-upliftmodelling" # MLflow experiment name
Импорт библиотек
Перед обработкой необходимо импортировать необходимые библиотеки Spark и SynapseML. Также необходимо импортировать библиотеку визуализации данных, например Seaborn, библиотеку визуализаций данных Python. Библиотека визуализации данных предоставляет высокоуровневый интерфейс для построения визуальных ресурсов на базе DataFrame и массивов данных. Дополнительные сведения о Spark, SynapseMLи Seaborn.
import os
import gzip
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.style as style
import seaborn as sns
%matplotlib inline
from synapse.ml.featurize import Featurize
from synapse.ml.core.spark import FluentAPI
from synapse.ml.lightgbm import *
from synapse.ml.train import ComputeModelStatistics
import mlflow
Скачивание набора данных и отправка в Lakehouse
Этот код загружает общедоступную версию набора данных, а затем сохраняет этот ресурс в Fabric lakehouse.
Важный
Перед запуском записной книжки убедитесь, что вы добавить lakehouse. Сбой этого приведет к ошибке.
if not IS_CUSTOM_DATA:
# Download demo data files into lakehouse if not exist
import os, requests
remote_url = "http://go.criteo.net/criteo-research-uplift-v2.1.csv.gz"
download_file = "criteo-research-uplift-v2.1.csv.gz"
download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError("Default lakehouse not found, please add a lakehouse and restart the session.")
os.makedirs(download_path, exist_ok=True)
if not os.path.exists(f"{download_path}/{DATA_FILE}"):
r = requests.get(f"{remote_url}", timeout=30)
with open(f"{download_path}/{download_file}", "wb") as f:
f.write(r.content)
with gzip.open(f"{download_path}/{download_file}", "rb") as fin:
with open(f"{download_path}/{DATA_FILE}", "wb") as fout:
fout.write(fin.read())
print("Downloaded demo data files into lakehouse.")
Начните запись времени выполнения в этой записной книжке.
# Record the notebook running time
import time
ts = time.time()
Настройка отслеживания экспериментов MLflow
Чтобы расширить возможности ведения журнала MLflow, автоматическое ведение журнала автоматически фиксирует значения входных параметров и выходных метрик модели машинного обучения во время обучения. Затем эти сведения записываются в рабочую область, где API MLflow или соответствующий эксперимент в рабочей области может получить доступ и визуализировать его. Дополнительные сведения об автологе см. в этом ресурсе.
# Set up the MLflow experiment
import mlflow
mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True) # Disable MLflow autologging
Заметка
Чтобы отключить автологирование Microsoft Fabric в сеансе записной книжки, вызовите mlflow.autolog()
и задайте disable=True
.
Чтение данных из озерохранилища
Чтение необработанных данных из lakehouse раздела "Файлы" и добавление дополнительных столбцов для разных частей даты. Те же сведения используются для создания секционированных разностных таблиц.
raw_df = spark.read.csv(f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True).cache()
Шаг 2. Разведочный анализ данных
Используйте команду display
для просмотра высокоуровневой статистики о наборе данных. Вы также можете отобразить представления диаграмм, чтобы легко визуализировать подмножества набора данных.
display(raw_df.limit(20))
Изучите процент посетителей на сайте, процент пользователей, которые совершают конверсии, и процент посетителей, которые совершают конверсии.
raw_df.select(
F.mean("visit").alias("Percentage of users that visit"),
F.mean("conversion").alias("Percentage of users that convert"),
(F.sum("conversion") / F.sum("visit")).alias("Percentage of visitors that convert"),
).show()
Анализ указывает, что 4.9% пользователей из группы лечения - пользователей, получающих лечение, или рекламу - посетил интернет-магазин. Только 3.8% пользователей из контрольной группы — пользователи, которые никогда не получали лечения, или которым никогда не предлагали или не выставляли рекламу — сделали то же самое. Кроме того, 0,31% всех пользователей из экспериментальной группы совершили конверсию или покупку, в то время как только 0,19% пользователей из контрольной группы сделали это. В результате конверсия посетителей, совершивших покупку и являвшихся членами экспериментальной группы, составила 6,36%по сравнению с только 5,07%** для пользователей контрольной группы. На основе этих результатов лечение может потенциально улучшить частоту посещений примерно на 1%и конверсию посетителей примерно на 1,3%. Лечение приводит к значительному улучшению.
Шаг 3. Определение модели для обучения
Подготовка обучения и проверка наборов данных
Здесь вы подгоните преобразователь Featurize к кадру данных raw_df
, чтобы извлечь признаки из указанных входных столбцов и вывести их в новый столбец под именем features
.
Результирующий кадр данных хранится в новом кадре данных с именем df
.
transformer = Featurize().setOutputCol("features").setInputCols(FEATURE_COLUMNS).fit(raw_df)
df = transformer.transform(raw_df)
# Split the DataFrame into training and test sets, with a 80/20 ratio and a seed of 42
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# Print the training and test dataset sizes
print("Size of train dataset: %d" % train_df.count())
print("Size of test dataset: %d" % test_df.count())
# Group the training dataset by the treatment column, and count the number of occurrences of each value
train_df.groupby(TREATMENT_COLUMN).count().show()
Подготовка наборов данных для обработки и контроля
После создания наборов данных для обучения и тестирования необходимо также сформировать наборы данных для обработки и управления ими, чтобы обучить модели машинного обучения для измерения повышения уровня.
# Extract the treatment and control DataFrames
treatment_train_df = train_df.where(f"{TREATMENT_COLUMN} > 0")
control_train_df = train_df.where(f"{TREATMENT_COLUMN} = 0")
Теперь, когда вы подготовили данные, можно продолжить обучение модели с помощью LightGBM.
Моделирование подъема: T-Learner с LightGBM
Метаучебники — это набор алгоритмов, основанных на алгоритмах машинного обучения, таких как LightGBM, Xgboost и т. д. Они помогают оценить эффект условного среднего лечения или CATE. T-learner — это метаучитель, который не использует одну модель. Вместо этого T-learner использует отдельную модель для каждой переменной лечения. Поэтому разработаны две модели, одну из которых мы называем метаобучающим или T-learner. T-learner использует несколько моделей машинного обучения для решения проблемы с полным игнорированием обработки, заставляя алгоритм сначала учитывать её.
mlflow.autolog(exclusive=False)
classifier = (
LightGBMClassifier(dataTransferMode="bulk")
.setFeaturesCol("features") # Set the column name for features
.setNumLeaves(10) # Set the number of leaves in each decision tree
.setNumIterations(100) # Set the number of boosting iterations
.setObjective("binary") # Set the objective function for binary classification
.setLabelCol(LABEL_COLUMN) # Set the column name for the label
)
# Start a new MLflow run with the name "uplift"
active_run = mlflow.start_run(run_name="uplift")
# Start a new nested MLflow run with the name "treatment"
with mlflow.start_run(run_name="treatment", nested=True) as treatment_run:
treatment_run_id = treatment_run.info.run_id # Get the ID of the treatment run
treatment_model = classifier.fit(treatment_train_df) # Fit the classifier on the treatment training data
# Start a new nested MLflow run with the name "control"
with mlflow.start_run(run_name="control", nested=True) as control_run:
control_run_id = control_run.info.run_id # Get the ID of the control run
control_model = classifier.fit(control_train_df) # Fit the classifier on the control training data
Использование тестового набора данных для прогнозирования
Здесь вы используете treatment_model
и control_model
, определенные ранее, для преобразования тестового набора данных test_df
. Затем вычислите прогнозируемый подъем. Вы определяете прогнозируемый подъем как разницу между прогнозируемым результатом лечения и прогнозируемым результатом контроля. Чем больше прогнозируемое различие в приросте, тем выше эффективность воздействия (например, реклама) для индивидуума или подгруппы.
getPred = F.udf(lambda v: float(v[1]), FloatType())
# Cache the resulting DataFrame for easier access
test_pred_df = (
test_df.mlTransform(treatment_model)
.withColumn("treatment_pred", getPred("probability"))
.drop("rawPrediction", "probability", "prediction")
.mlTransform(control_model)
.withColumn("control_pred", getPred("probability"))
.drop("rawPrediction", "probability", "prediction")
.withColumn("pred_uplift", F.col("treatment_pred") - F.col("control_pred"))
.select(TREATMENT_COLUMN, LABEL_COLUMN, "treatment_pred", "control_pred", "pred_uplift")
.cache()
)
# Display the first twenty rows of the resulting DataFrame
display(test_pred_df.limit(20))
Выполнение оценки модели
Поскольку реальное увеличение невозможно наблюдать для каждого отдельного человека, необходимо измерять увеличение для группы людей. Вы используете кривую роста, которая отображает реальный совокупный рост в популяции.
Ось x представляет соотношение населения, выбранного для лечения. Значение 0 указывает на отсутствие группы лечения - никто не подвергается лечению и не получает его предложений. Значение 1 предполагает полную группу лечения - каждый получает лечение или ему предлагают его. Ось Y показывает показатель увеличения. Цель состоит в том, чтобы найти размер группы лечения или процент населения, которые будут предложены или подвержены лечению (например, рекламе). Этот подход оптимизирует выбор целевого объекта для оптимизации результата.
Сначала ранжируйте тестовый кадр данных по прогнозируемому приросту. Прогнозируемый подъем — это разница между прогнозируемым результатом лечения и прогнозируемым результатом контроля.
# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))
display(test_ranked_df.limit(20))
Затем вычислите совокупный процент посещений как в группах лечения, так и в контрольных группах.
# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()
# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
test_ranked_df.withColumn(
"control_label",
F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"treatment_label",
F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"control_cumsum",
F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
)
.withColumn(
"treatment_cumsum",
F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
)
)
# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))
Наконец, для каждого процента вычислите прирост как разницу между совокупным процентом посещений между экспериментальной и контрольной группами.
test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))
Теперь настройте кривую подъема для прогноза тестового набора данных. Перед отображением необходимо преобразовать кадр данных PySpark в кадр данных Pandas.
def uplift_plot(uplift_df):
"""
Plot the uplift curve
"""
gain_x = uplift_df.percent_rank
gain_y = uplift_df.group_uplift
# Plot the data
fig = plt.figure(figsize=(10, 6))
mpl.rcParams["font.size"] = 8
ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")
plt.plot(
[0, gain_x.max()],
[0, gain_y.max()],
"--",
color="tab:orange",
label="Random Treatment",
)
plt.legend()
plt.xlabel("Porportion Targeted")
plt.ylabel("Uplift")
plt.grid()
return fig, ax
test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)
mlflow.log_figure(fig, "UpliftCurve.png")
Ось x представляет соотношение населения, выбранного для лечения. Значение 0 указывает на отсутствие группы лечения — никто не подвергается лечению и лечение никому не предлагается. Значение 1 предполагает, что вся группа лечения получает или ей предлагается лечение. Ось Y показывает показатель улучшения. Цель состоит в том, чтобы найти размер группы лечения или процент населения, которые будут предложены или подвержены лечению (например, рекламе). Этот подход оптимизирует выбор целевого объекта для оптимизации результата.
Во-первых, упорядочьте тестовую таблицу данных по предсказанному увеличению. Прогнозируемый подъем — это разница между прогнозируемым результатом лечения и прогнозируемым результатом контроля.
# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))
display(test_ranked_df.limit(20))
Затем вычислите совокупный процент посещений как в группах лечения, так и в контрольных группах.
# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()
# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
test_ranked_df.withColumn(
"control_label",
F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"treatment_label",
F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"control_cumsum",
F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
)
.withColumn(
"treatment_cumsum",
F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
)
)
# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))
Наконец, при каждом процентном значении вычислите увеличение группы как разницу между накопленным процентом посещений между контрольной и экспериментальной группами.
test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))
Теперь настройте кривую подъема для прогноза тестового набора данных. Перед отображением необходимо преобразовать кадр данных PySpark в кадр данных Pandas.
def uplift_plot(uplift_df):
"""
Plot the uplift curve
"""
gain_x = uplift_df.percent_rank
gain_y = uplift_df.group_uplift
# Plot the data
fig = plt.figure(figsize=(10, 6))
mpl.rcParams["font.size"] = 8
ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")
plt.plot(
[0, gain_x.max()],
[0, gain_y.max()],
"--",
color="tab:orange",
label="Random Treatment",
)
plt.legend()
plt.xlabel("Porportion Targeted")
plt.ylabel("Uplift")
plt.grid()
return fig, ax
test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)
mlflow.log_figure(fig, "UpliftCurve.png")
Анализ и кривая подъема обе показывают, что первые 20% популяции, согласно прогнозу, получат большую выгоду, если они получат лечение. Это означает, что верхние 20%% населения составляют группу поддающихся убеждению. Таким образом, можно определить оценку отсечения для требуемого размера группы, получающей лечение, в 20%, чтобы определить целевых клиентов для максимального эффекта.
cutoff_percentage = 0.2
cutoff_score = test_ranked_pd_df.iloc[int(len(test_ranked_pd_df) * cutoff_percentage)][
"pred_uplift"
]
print("Uplift scores that exceed {:.4f} map to Persuadables.".format(cutoff_score))
mlflow.log_metrics(
{"cutoff_score": cutoff_score, "cutoff_percentage": cutoff_percentage}
)
Шаг 4. Регистрация конечной модели машинного обучения
MLflow используется для отслеживания и регистрации всех экспериментов для групп лечения и управления. Это отслеживание и ведение журнала включают соответствующие параметры, метрики и модели. Эти сведения регистрируются под именем эксперимента в рабочей области для последующего использования.
# Register the model
treatment_model_uri = "runs:/{}/model".format(treatment_run_id)
mlflow.register_model(treatment_model_uri, f"{EXPERIMENT_NAME}-treatmentmodel")
control_model_uri = "runs:/{}/model".format(control_run_id)
mlflow.register_model(control_model_uri, f"{EXPERIMENT_NAME}-controlmodel")
mlflow.end_run()
Чтобы просмотреть эксперименты, выполните следующее:
- На левой панели выберите рабочую область.
- Найдите и выберите имя эксперимента, в этом случае aisample-upliftmodelling.
Шаг 5. Сохранение результатов прогнозирования
Microsoft Fabric предлагает PREDICT — масштабируемую функцию, которая поддерживает пакетную оценку в любом вычислительном механизме. Это позволяет клиентам работать с моделями машинного обучения. Пользователи могут создавать пакетные прогнозы прямо из записной книжки или страницы элементов для конкретной модели. Посетите этот ресурс, чтобы узнать больше о PREDICT и узнать, как использовать PREDICT в Microsoft Fabric.
# Load the model back
loaded_treatmentmodel = mlflow.spark.load_model(treatment_model_uri, dfs_tmpdir="Files/spark")
loaded_controlmodel = mlflow.spark.load_model(control_model_uri, dfs_tmpdir="Files/spark")
# Make predictions
batch_predictions_treatment = loaded_treatmentmodel.transform(test_df)
batch_predictions_control = loaded_controlmodel.transform(test_df)
batch_predictions_treatment.show(5)
# Save the predictions in the lakehouse
batch_predictions_treatment.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/batch_predictions_treatment"
)
batch_predictions_control.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/batch_predictions_control"
)
# Determine the entire runtime
print(f"Full run cost {int(time.time() - ts)} seconds.")
Связанное содержимое
- модель машинного обучения в Microsoft Fabric
- Обучение моделей машинного обучения
- эксперименты машинного обучения в Microsoft Fabric