다음을 통해 공유


자습서: 업리프트 모델 생성, 학습 및 평가

이 자습서에서는 Microsoft Fabric에서 Synapse 데이터 과학 워크플로의 엔드투엔드 예시를 제공합니다. 업리프트 모델을 만들고, 학습하고, 평가하고, 업리프트 모델링 기술을 적용하는 방법을 알아봅니다.

필수 조건

Notebook에서 따라 하기

다음 두 가지 방법 중 하나로 Notebook에서 따라 할 수 있습니다.

  • 기본 제공 Notebook을 열고 실행합니다.
  • GitHub에서 Notebook을 업로드합니다.

기본 제공 Notebook 열기

이 자습서에는 샘플 업리프트 모델링 Notebook이 함께 제공됩니다.

  1. 이 자습서의 샘플 노트북을 열려면, 데이터 과학 자습서를 위한 시스템 준비의 지침을 따르십시오.

  2. 코드 실행을 시작하기 전에 Notebook lakehouse를 연결해야 합니다.

GitHub에서 Notebook 가져오기

이 자습서에는 샘플 AIsample - Uplift Modeling.ipynb Notebook이 함께 제공됩니다.

이 자습서에 함께 제공되는 Notebook을 열려면 데이터 과학 자습서를 위한 시스템 준비의 지침에 따라 Notebook을 작업 영역으로 가져옵니다.

이 페이지에서 코드를 복사하여 붙여넣으려는 경우 새 Notebook을 만들 수 있습니다.

코드 실행을 시작하기 전에 Notebook에 레이크하우스를 연결해야 합니다.

1단계: 데이터 로드

데이터 세트

Criteo AI Lab에서 데이터 세트를 만들었습니다. 해당 데이터 세트에는 1,300만 개 행이 있습니다. 각 행은 1명의 사용자를 나타냅니다. 각 행에는 12개의 기능, 처리 표시기 및 방문과 전환을 포함하는 두 개의 이진 레이블이 있습니다.

Criteo AI Lab 데이터 세트 구조를 보여 주는 스크린샷.

  • f0 - f11: 기능 값(고밀도 부동 값)
  • 처리: 사용자가 무작위로 처리 대상이 되었는지 여부(예: 광고)(1 = 처리, 0 = 제어)
  • 전환: 사용자에 대한 전환(예: 구매)이 발생했는지 여부(이진, 레이블)
  • 방문: 사용자에 대한 전환(예: 구매)이 발생했는지 여부(이진, 레이블)

인용

이 Notebook에 사용된 데이터 세트에는 다음 BibTex 인용이 필요합니다.

@inproceedings{Diemert2018,
author = {{Diemert Eustache, Betlei Artem} and Renaudin, Christophe and Massih-Reza, Amini},
title={A Large Scale Benchmark for Uplift Modeling},
publisher = {ACM},
booktitle = {Proceedings of the AdKDD and TargetAd Workshop, KDD, London,United Kingdom, August, 20, 2018},
year = {2018}
}

다음 매개 변수를 정의하여 이 Notebook을 다른 데이터 세트에 쉽게 적용할 수 있습니다.

IS_CUSTOM_DATA = False  # If True, the user must upload the dataset manually
DATA_FOLDER = "Files/uplift-modelling"
DATA_FILE = "criteo-research-uplift-v2.1.csv"

# Data schema
FEATURE_COLUMNS = [f"f{i}" for i in range(12)]
TREATMENT_COLUMN = "treatment"
LABEL_COLUMN = "visit"

EXPERIMENT_NAME = "aisample-upliftmodelling"  # MLflow experiment name

라이브러리 가져오기

처리하기 전에 필요한 Spark 및 SynapseML 라이브러리를 가져와야 합니다. 또한 데이터 시각화 라이브러리(예: Python 데이터 시각화 라이브러리인 Seaborn)를 가져와야 합니다. 데이터 시각화 라이브러리는 DataFrames 및 배열에 시각적 리소스를 빌드할 수 있는 고급 인터페이스를 제공합니다. Spark, SynapseML, Seaborn에 대해 자세히 알아봅니다.

import os
import gzip

import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *

import numpy as np
import pandas as pd

import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.style as style
import seaborn as sns

%matplotlib inline

from synapse.ml.featurize import Featurize
from synapse.ml.core.spark import FluentAPI
from synapse.ml.lightgbm import *
from synapse.ml.train import ComputeModelStatistics

import mlflow

데이터 세트 다운로드 및 레이크하우스에 업로드

이 코드는 공개적으로 사용 가능한 버전의 데이터 세트를 다운로드한 다음 해당 데이터 리소스를 Fabric 레이크하우스에 저장합니다.

중요 사항

실행하기 전에 Notebook에 레이크하우스를 추가해야 합니다. 그렇게 하지 않으면 오류가 발생합니다.

if not IS_CUSTOM_DATA:
    # Download demo data files into lakehouse if not exist
    import os, requests

    remote_url = "http://go.criteo.net/criteo-research-uplift-v2.1.csv.gz"
    download_file = "criteo-research-uplift-v2.1.csv.gz"
    download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found, please add a lakehouse and restart the session.")
    os.makedirs(download_path, exist_ok=True)
    if not os.path.exists(f"{download_path}/{DATA_FILE}"):
        r = requests.get(f"{remote_url}", timeout=30)
        with open(f"{download_path}/{download_file}", "wb") as f:
            f.write(r.content)
        with gzip.open(f"{download_path}/{download_file}", "rb") as fin:
            with open(f"{download_path}/{DATA_FILE}", "wb") as fout:
                fout.write(fin.read())
    print("Downloaded demo data files into lakehouse.")

이 Notebook의 런타임 기록을 시작합니다.

# Record the notebook running time
import time

ts = time.time()

MLflow 실험 추적 설정

MLflow 로깅 기능을 확장하기 위해, 자동 로깅은 학습 중에 기계 학습 모델의 입력 매개 변수 및 출력 메트릭 값을 자동으로 캡처합니다. 그런 다음 이 정보는 작업 영역에 기록되며, MLflow API 또는 작업 영역의 해당 실험에서 액세스하여 시각화할 수 있습니다. 자동 로깅에 대한 자세한 내용을 보려면 이 리소스를 방문하세요.

# Set up the MLflow experiment
import mlflow

mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True)  # Disable MLflow autologging

참고 항목

Notebook 세션에서 Microsoft Fabric 자동 로깅을 사용하지 않도록 설정하려면 mlflow.autolog()을 호출하고 disable=True를 설정합니다.

레이크하우스에서 데이터 읽기

레이크하우스 파일 섹션에서 원시 데이터를 읽고 다른 날짜 부분에 대한 열을 더 추가합니다. 분할된 델타 테이블을 만드는 데도 동일한 정보가 사용됩니다.

raw_df = spark.read.csv(f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True).cache()

2단계: 탐색적 데이터 분석

display 명령을 사용하여 데이터 세트에 대한 개략적인 통계를 볼 수 있습니다. 차트 보기를 표시하여 데이터 세트의 하위 집합을 쉽게 시각화할 수도 있습니다.

display(raw_df.limit(20))

방문하는 사용자의 비율, 전환하는 사용자의 비율, 전환하는 방문자의 비율을 검사합니다.

raw_df.select(
    F.mean("visit").alias("Percentage of users that visit"),
    F.mean("conversion").alias("Percentage of users that convert"),
    (F.sum("conversion") / F.sum("visit")).alias("Percentage of visitors that convert"),
).show()

분석에 따르면 처리군(처리 또는 광고를 받은 사용자)의 4.9%가 온라인 스토어를 방문한 것으로 나타났습니다. 대조군 사용자(처리를 전혀 받지 않았거나, 광고를 제공받거나 노출된 적이 없는 사용자)의 3.8%만이 온라인 스토어를 방문했습니다. 또한, 처리군의 모든 사용자 중 0.31%가 전환하거나 구매했지만, 대조군의 사용자는 단지 0.19 %만이 전환하거나 구매를 했습니다. 그 결과, 구매를 한 방문자 중 처리군 구성원인 경우 구매 전환율은 6.36%인 반면, 대조군 사용자의 경우 5.07%**에 불과했습니다. 이 결과에 근거하여, 해당 처리를 통해 잠재적으로 대략 1%의 방문율을 향상할 수 있고, 방문자의 전환율은 대략 1.3% 향상할 수 있습니다. 이 처리는 상당한 개선으로 이어집니다.

3단계: 학습을 위한 모델 정의

학습 및 테스트용 데이터 준비

여기서는 raw_df DataFrame에 기능화 변환기를 맞춰서 지정한 입력 열에서 기능을 추출하고 해당 기능을 features이라는 이름의 새 열에 출력합니다.

결과 DataFrame은 df라는 이름의 새 DataFrame에 저장됩니다.

transformer = Featurize().setOutputCol("features").setInputCols(FEATURE_COLUMNS).fit(raw_df)
df = transformer.transform(raw_df)
# Split the DataFrame into training and test sets, with a 80/20 ratio and a seed of 42
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Print the training and test dataset sizes
print("Size of train dataset: %d" % train_df.count())
print("Size of test dataset: %d" % test_df.count())

# Group the training dataset by the treatment column, and count the number of occurrences of each value
train_df.groupby(TREATMENT_COLUMN).count().show()

처리 및 대조 데이터 세트 준비

학습 및 테스트 데이터 세트를 만든 후에는 처리 및 대조 데이터 세트도 구성하여 기계 학습 모델을 학습시켜 업리프트를 측정하도록 해야 합니다.

# Extract the treatment and control DataFrames
treatment_train_df = train_df.where(f"{TREATMENT_COLUMN} > 0")
control_train_df = train_df.where(f"{TREATMENT_COLUMN} = 0")

이제 데이터를 준비했으므로 LightGBM을 통해 모델 학습을 진행할 수 있습니다.

업리프트 모델링: LightGBM을 통한 T-Learner

메타 학습자는 LightGBM, Xgboost 등과 같은 기계 학습 알고리즘을 기반으로 하는 알고리즘 집합입니다. 이는 조건부 평균 처리 효과 또는 CATE를 추정하는 데 도움이 됩니다. T-learner는 단일 모델을 사용하지 않는 메타 학습자입니다. 대신 T-learner는 처리 변수당 하나의 모델을 사용합니다. 따라서 두 가지 모델이 개발되었으며 메타 학습자를 T-learner라고 합니다. T-learner는 여러 기계 학습 모델을 사용하여 학습자가 먼저 처리를 분할하도록 강요함으로써 처리를 완전히 폐기하는 문제를 극복합니다.

mlflow.autolog(exclusive=False)
classifier = (
    LightGBMClassifier(dataTransferMode="bulk")
    .setFeaturesCol("features")  # Set the column name for features
    .setNumLeaves(10)  # Set the number of leaves in each decision tree
    .setNumIterations(100)  # Set the number of boosting iterations
    .setObjective("binary")  # Set the objective function for binary classification
    .setLabelCol(LABEL_COLUMN)  # Set the column name for the label
)

# Start a new MLflow run with the name "uplift"
active_run = mlflow.start_run(run_name="uplift")

# Start a new nested MLflow run with the name "treatment"
with mlflow.start_run(run_name="treatment", nested=True) as treatment_run:
    treatment_run_id = treatment_run.info.run_id  # Get the ID of the treatment run
    treatment_model = classifier.fit(treatment_train_df)  # Fit the classifier on the treatment training data

# Start a new nested MLflow run with the name "control"
with mlflow.start_run(run_name="control", nested=True) as control_run:
    control_run_id = control_run.info.run_id  # Get the ID of the control run
    control_model = classifier.fit(control_train_df)  # Fit the classifier on the control training data
     

예측을 위해 테스트 데이터 세트 사용

여기서는 앞에서 정의한 treatment_modelcontrol_model 둘 다 사용하여 test_df 테스트 데이터 세트를 변환합니다. 그런 다음 예측된 업리프트를 계산합니다. 예측된 업리프트는 예측된 처리 결과와 예측된 대조 결과의 차이로 정의합니다. 이렇게 예측된 업리프트 차이가 클수록 개인 또는 하위 그룹에 대한 처리(예: 광고)의 효과가 커집니다.

getPred = F.udf(lambda v: float(v[1]), FloatType())

# Cache the resulting DataFrame for easier access
test_pred_df = (
    test_df.mlTransform(treatment_model)
    .withColumn("treatment_pred", getPred("probability"))
    .drop("rawPrediction", "probability", "prediction")
    .mlTransform(control_model)
    .withColumn("control_pred", getPred("probability"))
    .drop("rawPrediction", "probability", "prediction")
    .withColumn("pred_uplift", F.col("treatment_pred") - F.col("control_pred"))
    .select(TREATMENT_COLUMN, LABEL_COLUMN, "treatment_pred", "control_pred", "pred_uplift")
    .cache()
)

# Display the first twenty rows of the resulting DataFrame
display(test_pred_df.limit(20))

모델 평가 수행

각 개인에 대해 실제 업리프트를 관찰할 수 없으므로 여러 개인으로 구성된 그룹을 대상으로 업리프트를 측정해야 합니다. 모집단 전체의 실제 누적 업리프트를 표시하는 업리프트 곡선을 사용합니다.

정규화된 업리프트 모델 곡선과 임의 처리를 비교한 차트의 스크린샷.

x축은 처리에 대해 선택한 인구의 비율을 나타냅니다. 0의 값은 처리군이 없음을 나타냅니다. 아무도 처리에 노출되지 않고, 처리를 제공받지 않습니다. 1의 값은 전체 처리군을 나타냅니다. 모든 사람이 처리에 노출되거나, 처리를 제공받습니다. y축은 업리프트 측정값을 보여줍니다. 목표는 처리군의 규모 또는 처리를 제공하거나 노출할 인구의 비율을 찾는 것입니다(예: 광고). 이 접근 방식은 결과를 최적화하기 위해 대상 선택을 최적화합니다.

먼저, 예측된 업리프트를 기준으로 테스트 DataFrame 순서의 순위를 지정합니다. 예측된 업리프트는 예측된 처리 결과와 예측된 대조 결과의 차이입니다.

# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))

display(test_ranked_df.limit(20))

다음으로, 처리군과 대조군 모두에서 누적 방문율을 계산합니다.

# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()

# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
    test_ranked_df.withColumn(
        "control_label",
        F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "treatment_label",
        F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "control_cumsum",
        F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
    )
    .withColumn(
        "treatment_cumsum",
        F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
    )
)

# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))

마지막으로, 각 백분율에서, 처리군과 대조군 간 누적 방문율의 차이로 그룹의 업리프트를 계산합니다.

test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))

이제 테스트 데이터 세트 예측에 대한 업리프트 곡선을 그립니다. 그리기 전에 PySpark DataFrame을 Pandas DataFrame으로 변환해야 합니다.

def uplift_plot(uplift_df):
    """
    Plot the uplift curve
    """
    gain_x = uplift_df.percent_rank
    gain_y = uplift_df.group_uplift
    # Plot the data
    fig = plt.figure(figsize=(10, 6))
    mpl.rcParams["font.size"] = 8

    ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")

    plt.plot(
        [0, gain_x.max()],
        [0, gain_y.max()],
        "--",
        color="tab:orange",
        label="Random Treatment",
    )
    plt.legend()
    plt.xlabel("Porportion Targeted")
    plt.ylabel("Uplift")
    plt.grid()

    return fig, ax


test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)

mlflow.log_figure(fig, "UpliftCurve.png")

정규화된 업리프트 모델 곡선과 임의 처리를 비교한 차트의 스크린샷.

x축은 처리에 대해 선택한 인구의 비율을 나타냅니다. 0의 값은 처리군이 없음을 나타냅니다. 아무도 처리에 노출되지 않고, 처리를 제공받지 않습니다. 1의 값은 전체 처리군을 나타냅니다. 모든 사람이 처리에 노출되거나, 처리를 제공받습니다. y축은 업리프트 측정값을 보여줍니다. 목표는 처리군의 규모 또는 처리를 제공하거나 노출할 인구의 비율을 찾는 것입니다(예: 광고). 이 접근 방식은 결과를 최적화하기 위해 대상 선택을 최적화합니다.

먼저, 예측된 업리프트를 기준으로 테스트 DataFrame 순서의 순위를 지정합니다. 예측된 업리프트는 예측된 처리 결과와 예측된 대조 결과의 차이입니다.

# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))

display(test_ranked_df.limit(20))

다음으로, 처리군과 대조군 모두에서 누적 방문율을 계산합니다.

# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()

# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
    test_ranked_df.withColumn(
        "control_label",
        F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "treatment_label",
        F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
    )
    .withColumn(
        "control_cumsum",
        F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
    )
    .withColumn(
        "treatment_cumsum",
        F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
    )
)

# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))

마지막으로, 각 백분율에서, 처리군과 대조군 간 누적 방문율의 차이로 그룹의 업리프트를 계산합니다.

test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))

이제 테스트 데이터 세트 예측에 대한 업리프트 곡선을 그립니다. 그리기 전에 PySpark DataFrame을 Pandas DataFrame으로 변환해야 합니다.

def uplift_plot(uplift_df):
    """
    Plot the uplift curve
    """
    gain_x = uplift_df.percent_rank
    gain_y = uplift_df.group_uplift
    # Plot the data
    fig = plt.figure(figsize=(10, 6))
    mpl.rcParams["font.size"] = 8

    ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")

    plt.plot(
        [0, gain_x.max()],
        [0, gain_y.max()],
        "--",
        color="tab:orange",
        label="Random Treatment",
    )
    plt.legend()
    plt.xlabel("Porportion Targeted")
    plt.ylabel("Uplift")
    plt.grid()

    return fig, ax


test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)

mlflow.log_figure(fig, "UpliftCurve.png")

정규화된 업리프트 모델 곡선과 임의 처리를 비교한 차트의 스크린샷.

분석 결과와 업리프트 곡선은 모두 예측에 따라 순위가 매겨진 상위 20% 인구가 처리를 받는 경우에 큰 이득이 있을 것임을 보여줍니다. 즉, 인구의 상위 20%가 설득력 있는 그룹을 대표함을 의미합니다. 따라서 처리군의 원하는 규모에 대한 컷오프 점수를 20%로 설정하여 가장 큰 영향을 미치는 대상 선택 고객을 식별할 수 있습니다.

cutoff_percentage = 0.2
cutoff_score = test_ranked_pd_df.iloc[int(len(test_ranked_pd_df) * cutoff_percentage)][
    "pred_uplift"
]

print("Uplift scores that exceed {:.4f} map to Persuadables.".format(cutoff_score))
mlflow.log_metrics(
    {"cutoff_score": cutoff_score, "cutoff_percentage": cutoff_percentage}
)

4단계: 최종 ML 모델 등록

MLflow를 사용하여 처리군과 대조군 모두에 대한 모든 실험을 추적하고 기록합니다. 이 추적 및 로깅에는 해당 매개 변수, 메트릭 및 모델이 포함됩니다. 이 정보는 나중에 사용할 수 있도록 작업 영역에 실험 이름으로 기록됩니다.

# Register the model
treatment_model_uri = "runs:/{}/model".format(treatment_run_id)
mlflow.register_model(treatment_model_uri, f"{EXPERIMENT_NAME}-treatmentmodel")

control_model_uri = "runs:/{}/model".format(control_run_id)
mlflow.register_model(control_model_uri, f"{EXPERIMENT_NAME}-controlmodel")

mlflow.end_run()

실험을 보려면 다음을 수행합니다.

  1. 왼쪽 패널에서 작업 영역을 선택합니다.
  2. 실험 이름을 찾아서 선택합니다(이 경우 aisample-upliftmodelling).

aisample 업리프트 모델링 실험 결과를 보여 주는 스크린샷.

5단계: 예측 결과 저장

Microsoft Fabric은 모든 컴퓨팅 엔진에서 일괄 처리 채점을 지원하는 확장 가능한 기능인 PREDICT를 제공합니다. 이를 통해 고객은 기계 학습 모델을 운영할 수 있습니다. 사용자는 Notebook 또는 특정 모델에 대한 항목 페이지에서 바로 일괄 처리 예측을 만들 수 있습니다. PREDICT에 대해 자세히 알아보고 Microsoft Fabric에서 PREDICT를 사용하는 방법을 알아보려면 이 리소스를 방문하세요.

# Load the model back
loaded_treatmentmodel = mlflow.spark.load_model(treatment_model_uri, dfs_tmpdir="Files/spark")
loaded_controlmodel = mlflow.spark.load_model(control_model_uri, dfs_tmpdir="Files/spark")

# Make predictions
batch_predictions_treatment = loaded_treatmentmodel.transform(test_df)
batch_predictions_control = loaded_controlmodel.transform(test_df)
batch_predictions_treatment.show(5)
# Save the predictions in the lakehouse
batch_predictions_treatment.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions_treatment"
)
batch_predictions_control.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions_control"
)
# Determine the entire runtime
print(f"Full run cost {int(time.time() - ts)} seconds.")