Przewodnik: Tworzenie, trenowanie i ocenianie modelu uplift
Ten samouczek przedstawia pełny przykład przepływu pracy analizy danych usługi Synapse w usłudze Microsoft Fabric. Dowiesz się, jak tworzyć, trenować i oceniać modele uplift oraz stosować techniki modelowania uplift.
Warunki wstępne
Pobierz subskrypcję usługi Microsoft Fabric . Możesz też utworzyć bezpłatne konto wersji próbnej usługi Microsoft Fabric.
Zaloguj się do usługi Microsoft Fabric.
Użyj przełącznika środowiska w lewej dolnej części Twojej strony głównej, aby przełączyć się na Fabric.
- Znajomość notebooków usługi Microsoft Fabric
- Usługa lakehouse dla tego notatnika do przechowywania danych na potrzeby tego przykładu. Aby uzyskać więcej informacji, odwiedź stronę Dodaj lakehouse do swojego notesu
Śledź notatki w notesie
Możesz śledzić w notatniku na jeden z dwóch sposobów:
- Otwórz i uruchom wbudowany notes.
- Prześlij notatnik z GitHub.
Otwieranie wbudowanego notesu
Przykładowy notatnik modelowania uplift towarzyszy temu samouczkowi.
Aby otworzyć przykładowy notes na potrzeby tego samouczka, postępuj zgodnie z instrukcjami w Prepare your system for data science tutorials.
Upewnij się, że dołączysz lakehouse do notesu, zanim zaczniesz uruchamiać kod.
Import notatnika z GitHub
Ten samouczek zawiera notes AIsample — Uplift Modeling.ipynb.
Aby otworzyć towarzyszący notes na potrzeby tego samouczka, postępuj zgodnie z instrukcjami w Prepare your system for data science tutorials, aby zaimportować notes do obszaru roboczego.
Możesz utworzyć nowy notes, jeśli wolisz skopiować i wkleić kod z tej strony.
Pamiętaj, aby dołączyć magazyn typu lakehouse do notesu przed uruchomieniem kodu.
Krok 1. Ładowanie danych
Zbiór danych
Laboratorium Criteo AI lab utworzyło zestaw danych. Ten zestaw danych zawiera 13 mln wierszy. Każdy wiersz reprezentuje jednego użytkownika. Każdy wiersz ma 12 właściwości, wskaźnik leczenia oraz dwie etykiety binarne, w skład których wchodzą wizyta i konwersja.
- f0 – f11: wartości cech (gęste, zmiennoprzecinkowe)
- eksperymentu: czy użytkownik został losowo wybrany do udziału w eksperymencie (na przykład kampanii reklamowej) (1 = eksperyment, 0 = kontrola)
- konwersji: czy nastąpiła konwersja (na przykład dokonano zakupu) dla użytkownika (binarne, etykieta)
- odwiedź: czy doszło do konwersji (na przykład dokonano zakupu) przez użytkownika (binarnie, etykieta)
Cytat
- Strona główna zestawu danych: https://ailab.criteo.com/criteo-uplift-prediction-dataset/
Zestaw danych używany na potrzeby tego notatnika wymaga tego cytowania BibTeX.
@inproceedings{Diemert2018,
author = {{Diemert Eustache, Betlei Artem} and Renaudin, Christophe and Massih-Reza, Amini},
title={A Large Scale Benchmark for Uplift Modeling},
publisher = {ACM},
booktitle = {Proceedings of the AdKDD and TargetAd Workshop, KDD, London,United Kingdom, August, 20, 2018},
year = {2018}
}
Napiwek
Definiując następujące parametry, można łatwo zastosować ten notes w różnych zestawach danych.
IS_CUSTOM_DATA = False # If True, the user must upload the dataset manually
DATA_FOLDER = "Files/uplift-modelling"
DATA_FILE = "criteo-research-uplift-v2.1.csv"
# Data schema
FEATURE_COLUMNS = [f"f{i}" for i in range(12)]
TREATMENT_COLUMN = "treatment"
LABEL_COLUMN = "visit"
EXPERIMENT_NAME = "aisample-upliftmodelling" # MLflow experiment name
Importowanie bibliotek
Przed przetworzeniem należy zaimportować wymagane biblioteki Spark i SynapseML. Należy również zaimportować bibliotekę wizualizacji danych — na przykład Seaborn, bibliotekę wizualizacji danych języka Python. Biblioteka wizualizacji danych udostępnia interfejs wysokiego poziomu umożliwiający tworzenie zasobów wizualnych na obiektach DataFrame i tablicach. Dowiedz się więcej o spark, SynapseML i Seaborn.
import os
import gzip
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.style as style
import seaborn as sns
%matplotlib inline
from synapse.ml.featurize import Featurize
from synapse.ml.core.spark import FluentAPI
from synapse.ml.lightgbm import *
from synapse.ml.train import ComputeModelStatistics
import mlflow
Pobieranie zestawu danych i przekazywanie do usługi Lakehouse
Ten kod pobiera publicznie dostępną wersję zestawu danych, a następnie przechowuje ten zasób danych w usłudze Fabric lakehouse.
Ważny
Przed jego uruchomieniem upewnij się, że dodano lakehouse do notatnika. Niepowodzenie w tym celu spowoduje wystąpienie błędu.
if not IS_CUSTOM_DATA:
# Download demo data files into lakehouse if not exist
import os, requests
remote_url = "http://go.criteo.net/criteo-research-uplift-v2.1.csv.gz"
download_file = "criteo-research-uplift-v2.1.csv.gz"
download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError("Default lakehouse not found, please add a lakehouse and restart the session.")
os.makedirs(download_path, exist_ok=True)
if not os.path.exists(f"{download_path}/{DATA_FILE}"):
r = requests.get(f"{remote_url}", timeout=30)
with open(f"{download_path}/{download_file}", "wb") as f:
f.write(r.content)
with gzip.open(f"{download_path}/{download_file}", "rb") as fin:
with open(f"{download_path}/{DATA_FILE}", "wb") as fout:
fout.write(fin.read())
print("Downloaded demo data files into lakehouse.")
Rozpocznij rejestrowanie czasu działania tego notatnika.
# Record the notebook running time
import time
ts = time.time()
Konfigurowanie śledzenia eksperymentów MLflow
Aby rozszerzyć możliwości rejestrowania MLflow, automatyczne rejestrowanie automatycznie przechwytuje wartości parametrów wejściowych i metryk wyjściowych modelu uczenia maszynowego podczas trenowania. Te informacje są następnie rejestrowane w obszarze roboczym, w którym interfejsy API MLflow lub odpowiedni eksperyment w obszarze roboczym mogą uzyskiwać do niego dostęp i wizualizować je. Odwiedź ten zasób, aby uzyskać więcej informacji na temat automatycznego rejestrowania.
# Set up the MLflow experiment
import mlflow
mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True) # Disable MLflow autologging
Notatka
Aby wyłączyć automatyczne rejestrowanie usługi Microsoft Fabric w sesji notesu, wywołaj mlflow.autolog()
i ustaw disable=True
.
Odczytywanie danych z jeziora
Odczytaj surowe dane z sekcji Lakehouse Files i dodaj więcej kolumn zawierających różne części daty. Te same informacje są używane do tworzenia partycjonowanej tabeli różnicowej.
raw_df = spark.read.csv(f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True).cache()
Krok 2. Eksploracyjna analiza danych
Użyj polecenia display
, aby wyświetlić ogólne statystyki dotyczące zestawu danych. Możesz również wyświetlić widoki wykresów, aby łatwo wizualizować podzestawy zestawu danych.
display(raw_df.limit(20))
Sprawdź procent użytkowników, którzy odwiedzają, procent użytkowników, którzy konwertują, oraz procent odwiedzających, którzy konwertują.
raw_df.select(
F.mean("visit").alias("Percentage of users that visit"),
F.mean("conversion").alias("Percentage of users that convert"),
(F.sum("conversion") / F.sum("visit")).alias("Percentage of visitors that convert"),
).show()
Analiza wskazuje, że 4,9% użytkowników z grupy objętej działaniem (użytkowników, którzy otrzymali działanie lub reklamę) odwiedzili sklep internetowy. Tylko 3,8% użytkowników z grupy kontrolnej - użytkownicy, którzy nigdy nie otrzymali leczenia, lub nigdy nie byli oferowani lub narażeni na reklamę - zrobili to samo. Ponadto 0,31% wszystkich użytkowników z grupy badanej przekonwertowało lub dokonało zakupu, podczas gdy tylko 0,19% użytkowników z grupy kontrolnej. W rezultacie współczynnik konwersji odwiedzających, którzy dokonali zakupu, którzy byli również członkami grupy leczenia, jest 6,36%, w porównaniu do tylko 5,07%** dla użytkowników grupy kontrolnej. Na podstawie tych wyników leczenie może potencjalnie poprawić wskaźnik wizyty o około 1%, a współczynnik konwersji odwiedzających o około 1,3%. Leczenie prowadzi do znacznej poprawy.
Krok 3. Definiowanie modelu na potrzeby trenowania
Przygotowywanie trenowania i testowania zestawów danych
W tym miejscu dopasujesz funkcję przekształcania Featurize do raw_df
DataFrame, aby wyodrębnić funkcje z określonych kolumn wejściowych i wyprowadzić te funkcje do nowej kolumny o nazwie features
.
Wynikowa ramka danych jest przechowywana w nowej ramce danych o nazwie df
.
transformer = Featurize().setOutputCol("features").setInputCols(FEATURE_COLUMNS).fit(raw_df)
df = transformer.transform(raw_df)
# Split the DataFrame into training and test sets, with a 80/20 ratio and a seed of 42
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# Print the training and test dataset sizes
print("Size of train dataset: %d" % train_df.count())
print("Size of test dataset: %d" % test_df.count())
# Group the training dataset by the treatment column, and count the number of occurrences of each value
train_df.groupby(TREATMENT_COLUMN).count().show()
Przygotowywanie zestawów danych leczenia i kontroli
Po utworzeniu zbiorów danych do trenowania i testowania, należy również utworzyć zbiory danych eksperymentalne i kontrolne, aby wytrenować modele uczenia maszynowego w celu oceny wzrostu efektywności.
# Extract the treatment and control DataFrames
treatment_train_df = train_df.where(f"{TREATMENT_COLUMN} > 0")
control_train_df = train_df.where(f"{TREATMENT_COLUMN} = 0")
Po przygotowaniu danych możesz przejść do trenowania modelu za pomocą rozwiązania LightGBM.
Modelowanie uplift: T-Learner z LightGBM
Metauczeń to zestaw algorytmów, oparty na algorytmach uczenia maszynowego, takich jak LightGBM, Xgboost itp. Pomagają oszacować średni efekt leczenia warunkowego lub CATE. T-learner to meta-uczeń, który nie korzysta z pojedynczego modelu. Zamiast tego T-learner używa jednego modelu na zmienną dotyczącą leczenia. W związku z tym dwa modele są opracowywane i odnosimy się do meta-ucznia jako T-uczeń. Uczenie T wykorzystuje wiele modeli uczenia maszynowego, aby przezwyciężyć problem całkowitego pominięcia leczenia, zmuszając model do uwzględnienia leczenia w pierwszej kolejności.
mlflow.autolog(exclusive=False)
classifier = (
LightGBMClassifier(dataTransferMode="bulk")
.setFeaturesCol("features") # Set the column name for features
.setNumLeaves(10) # Set the number of leaves in each decision tree
.setNumIterations(100) # Set the number of boosting iterations
.setObjective("binary") # Set the objective function for binary classification
.setLabelCol(LABEL_COLUMN) # Set the column name for the label
)
# Start a new MLflow run with the name "uplift"
active_run = mlflow.start_run(run_name="uplift")
# Start a new nested MLflow run with the name "treatment"
with mlflow.start_run(run_name="treatment", nested=True) as treatment_run:
treatment_run_id = treatment_run.info.run_id # Get the ID of the treatment run
treatment_model = classifier.fit(treatment_train_df) # Fit the classifier on the treatment training data
# Start a new nested MLflow run with the name "control"
with mlflow.start_run(run_name="control", nested=True) as control_run:
control_run_id = control_run.info.run_id # Get the ID of the control run
control_model = classifier.fit(control_train_df) # Fit the classifier on the control training data
Korzystanie z zestawu danych testowych na potrzeby przewidywania
W tym miejscu użyjesz treatment_model
i control_model
( zdefiniowanych wcześniej) w celu przekształcenia zestawu danych testowego test_df
. Następnie obliczysz przewidywany wzrost. Przewidywany wzrost jest definiowany jako różnica między przewidywanym wynikiem leczenia a przewidywanym wynikiem kontroli. Im większa przewidywana różnica w podnoszeniu, tym większa skuteczność działania (na przykład reklamy) na osobę indywidualną lub podgrupę.
getPred = F.udf(lambda v: float(v[1]), FloatType())
# Cache the resulting DataFrame for easier access
test_pred_df = (
test_df.mlTransform(treatment_model)
.withColumn("treatment_pred", getPred("probability"))
.drop("rawPrediction", "probability", "prediction")
.mlTransform(control_model)
.withColumn("control_pred", getPred("probability"))
.drop("rawPrediction", "probability", "prediction")
.withColumn("pred_uplift", F.col("treatment_pred") - F.col("control_pred"))
.select(TREATMENT_COLUMN, LABEL_COLUMN, "treatment_pred", "control_pred", "pred_uplift")
.cache()
)
# Display the first twenty rows of the resulting DataFrame
display(test_pred_df.limit(20))
Przeprowadzanie oceny modelu
Ponieważ rzeczywistego efektu nie można zaobserwować dla każdej osoby, musisz zmierzyć efekt dla grupy osób. Używasz krzywej wzrostu, który przedstawia rzeczywisty, skumulowany wzrost w całej populacji.
Oś x reprezentuje stosunek populacji wybranej do leczenia. Wartość 0 sugeruje brak grupy otrzymującej leczenie - nikt nie jest poddany leczeniu ani któremu oferowane jest leczenie. Wartość 1 sugeruje pełną grupę leczenia - każdemu jest oferowane lub poddawane leczenie. Na osi y pokazana jest miara wzrostu. Celem jest znalezienie wielkości grupy leczenia lub procent populacji, która byłaby oferowana lub narażona na leczenie (na przykład reklamy). To podejście optymalizuje wybór docelowy, aby zoptymalizować wynik.
Najpierw należy sklasyfikować kolejność testowej ramki danych według przewidywanego wzrostu. Przewidywany wzrost jest różnicą między przewidywanym wynikiem leczenia a przewidywanym wynikiem kontroli.
# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))
display(test_ranked_df.limit(20))
Następnie oblicz skumulowany procent wizyt w grupach leczenia i grup kontrolnych.
# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()
# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
test_ranked_df.withColumn(
"control_label",
F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"treatment_label",
F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"control_cumsum",
F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
)
.withColumn(
"treatment_cumsum",
F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
)
)
# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))
Na koniec, przy każdym procentzie, oblicz wzrost grupy jako różnicę między skumulowanym procentem wizyt między grupami leczenia i grup kontrolnych.
test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))
Teraz wykreślij krzywą wzrostu dla przewidywania zestawu danych testowych. Przed wykreśleniem należy przekonwertować ramkę danych PySpark na ramkę danych Pandas.
def uplift_plot(uplift_df):
"""
Plot the uplift curve
"""
gain_x = uplift_df.percent_rank
gain_y = uplift_df.group_uplift
# Plot the data
fig = plt.figure(figsize=(10, 6))
mpl.rcParams["font.size"] = 8
ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")
plt.plot(
[0, gain_x.max()],
[0, gain_y.max()],
"--",
color="tab:orange",
label="Random Treatment",
)
plt.legend()
plt.xlabel("Porportion Targeted")
plt.ylabel("Uplift")
plt.grid()
return fig, ax
test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)
mlflow.log_figure(fig, "UpliftCurve.png")
Oś x reprezentuje stosunek populacji wybranej do leczenia. Wartość 0 sugeruje brak grupy otrzymującej leczenie - nikt nie jest poddany leczeniu ani któremu oferowane jest leczenie. Wartość 1 sugeruje pełną grupę leczenia - każdemu jest oferowane lub poddawane leczenie. Na osi y pokazana jest miara wzrostu. Celem jest znalezienie wielkości grupy leczenia lub procent populacji, która byłaby oferowana lub narażona na leczenie (na przykład reklamy). To podejście optymalizuje wybór docelowy, aby zoptymalizować wynik.
Najpierw należy sklasyfikować kolejność testowej ramki danych według przewidywanego wzrostu. Przewidywany wzrost jest różnicą między przewidywanym wynikiem leczenia a przewidywanym wynikiem kontroli.
# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))
display(test_ranked_df.limit(20))
Następnie oblicz skumulowany procent wizyt w grupach leczenia i grup kontrolnych.
# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()
# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
test_ranked_df.withColumn(
"control_label",
F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"treatment_label",
F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"control_cumsum",
F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
)
.withColumn(
"treatment_cumsum",
F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
)
)
# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))
Na koniec, przy każdym procentzie, oblicz wzrost grupy jako różnicę między skumulowanym procentem wizyt między grupami leczenia i grup kontrolnych.
test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))
Teraz wykreślij krzywą wzrostu dla przewidywania zestawu danych testowych. Przed wykreśleniem należy przekonwertować ramkę danych PySpark na ramkę danych Pandas.
def uplift_plot(uplift_df):
"""
Plot the uplift curve
"""
gain_x = uplift_df.percent_rank
gain_y = uplift_df.group_uplift
# Plot the data
fig = plt.figure(figsize=(10, 6))
mpl.rcParams["font.size"] = 8
ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")
plt.plot(
[0, gain_x.max()],
[0, gain_y.max()],
"--",
color="tab:orange",
label="Random Treatment",
)
plt.legend()
plt.xlabel("Porportion Targeted")
plt.ylabel("Uplift")
plt.grid()
return fig, ax
test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)
mlflow.log_figure(fig, "UpliftCurve.png")
Analiza i krzywa podnoszenia w górę pokazują, że 20 najlepszych% populacji, zgodnie z prognozą, miałoby duży zysk, gdyby otrzymały leczenie. Oznacza to, że pierwsze 20% populacji reprezentuje grupę przekonywalnych. W związku z tym można ustawić próg punktowy dla pożądanego rozmiaru grupy terapeutycznej na 20%, aby zidentyfikować docelowych klientów wybieranych dla maksymalnego wpływu.
cutoff_percentage = 0.2
cutoff_score = test_ranked_pd_df.iloc[int(len(test_ranked_pd_df) * cutoff_percentage)][
"pred_uplift"
]
print("Uplift scores that exceed {:.4f} map to Persuadables.".format(cutoff_score))
mlflow.log_metrics(
{"cutoff_score": cutoff_score, "cutoff_percentage": cutoff_percentage}
)
Krok 4. Rejestrowanie końcowego modelu uczenia maszynowego
Używasz biblioteki MLflow do śledzenia i rejestrowania wszystkich eksperymentów zarówno dla grup leczenia, jak i grup kontrolnych. To śledzenie i rejestrowanie obejmuje odpowiednie parametry, metryki i modele. Te informacje są rejestrowane pod nazwą eksperymentu w obszarze roboczym do późniejszego użycia.
# Register the model
treatment_model_uri = "runs:/{}/model".format(treatment_run_id)
mlflow.register_model(treatment_model_uri, f"{EXPERIMENT_NAME}-treatmentmodel")
control_model_uri = "runs:/{}/model".format(control_run_id)
mlflow.register_model(control_model_uri, f"{EXPERIMENT_NAME}-controlmodel")
mlflow.end_run()
Aby obejrzeć swoje eksperymenty:
- Na panelu po lewej stronie wybierz swój obszar roboczy.
- Znajdź i wybierz nazwę eksperymentu, w tym przypadku aisample-upliftmodelling.
Krok 5. Zapisywanie wyników przewidywania
Microsoft Fabric oferuje funkcję PREDICT — skalowalną funkcję, która obsługuje ocenianie wsadowe w dowolnym silniku obliczeniowym. Umożliwia klientom operacjonalizacja modeli uczenia maszynowego. Użytkownicy mogą tworzyć przewidywania wsadowe bezpośrednio z notesu lub strony elementu dla określonego modelu. Odwiedź ten zasób, aby dowiedzieć się więcej na temat funkcji PREDICT i dowiedzieć się, jak używać funkcji PREDICT w usłudze Microsoft Fabric.
# Load the model back
loaded_treatmentmodel = mlflow.spark.load_model(treatment_model_uri, dfs_tmpdir="Files/spark")
loaded_controlmodel = mlflow.spark.load_model(control_model_uri, dfs_tmpdir="Files/spark")
# Make predictions
batch_predictions_treatment = loaded_treatmentmodel.transform(test_df)
batch_predictions_control = loaded_controlmodel.transform(test_df)
batch_predictions_treatment.show(5)
# Save the predictions in the lakehouse
batch_predictions_treatment.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/batch_predictions_treatment"
)
batch_predictions_control.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/batch_predictions_control"
)
# Determine the entire runtime
print(f"Full run cost {int(time.time() - ts)} seconds.")