Selvstudium: Opret, oplær og evaluer en opløftningsmodel
I dette selvstudium præsenteres et eksempel fra ende til anden på en Synapse Data Science-arbejdsproces i Microsoft Fabric. Du lærer, hvordan du opretter, oplærer og evaluerer opløftningsmodeller og anvender opløftende modelleringsteknikker.
Forudsætninger
Få et Microsoft Fabric-abonnement. Du kan også tilmelde dig en gratis Microsoft Fabric-prøveversion.
Log på Microsoft Fabric.
Brug oplevelsesskifteren nederst til venstre på startsiden til at skifte til Fabric.
- Kendskab til Microsoft Fabric-notesbøger
- Et lakehouse til denne notesbog til lagring af data for dette eksempel. Du kan finde flere oplysninger Føj et lakehouse til din notesbog
Følg med i en notesbog
Du kan følge med i en notesbog på to måder:
- Åbn og kør den indbyggede notesbog.
- Upload din notesbog fra GitHub.
Åbn den indbyggede notesbog
Eksemplet Uplift-modellering notesbog følger med dette selvstudium.
Hvis du vil åbne eksempelnotesbogen til dette selvstudium, skal du følge vejledningen i Forbered dit system til selvstudier om datavidenskab.
Sørg for at vedhæfte et lakehouse til notesbogen, før du begynder at køre kode.
Importér notesbogen fra GitHub
Den AIsample – Uplift Modeling.ipynb notesbog følger med dette selvstudium.
Hvis du vil åbne den medfølgende notesbog til dette selvstudium, skal du følge vejledningen i Forbered dit system til selvstudier om datavidenskabfor at importere notesbogen til dit arbejdsområde.
Du kan oprette en ny notesbog, hvis du hellere vil kopiere og indsætte koden fra denne side.
Sørg for at vedhæfte et lakehouse til notesbogen, før du begynder at køre kode.
Trin 1: Indlæs dataene
Datasæt
Criteo AI Lab oprettede datasættet. Dette datasæt har 13.000.000 rækker. Hver række repræsenterer én bruger. Hver række har 12 funktioner, en behandlingsindikator og to binære mærkater, der omfatter besøg og konvertering.
- f0 - f11: funktionsværdier (tætte, flydende værdier)
- behandling: hvorvidt en bruger tilfældigt var målrettet mod behandling (f.eks. reklame) (1 = behandling, 0 = kontrol)
- konvertering: om en konvertering fandt sted (f.eks. foretaget et køb) for en bruger (binær, etiket)
- besøg: om en konvertering fandt sted (f.eks. foretaget et køb) for en bruger (binær, etiket)
Citat
- Startside for datasæt: https://ailab.criteo.com/criteo-uplift-prediction-dataset/
Det datasæt, der bruges til denne notesbog, kræver dette BibTex-citat:
@inproceedings{Diemert2018,
author = {{Diemert Eustache, Betlei Artem} and Renaudin, Christophe and Massih-Reza, Amini},
title={A Large Scale Benchmark for Uplift Modeling},
publisher = {ACM},
booktitle = {Proceedings of the AdKDD and TargetAd Workshop, KDD, London,United Kingdom, August, 20, 2018},
year = {2018}
}
Drikkepenge
Ved at definere følgende parametre kan du nemt anvende denne notesbog på forskellige datasæt.
IS_CUSTOM_DATA = False # If True, the user must upload the dataset manually
DATA_FOLDER = "Files/uplift-modelling"
DATA_FILE = "criteo-research-uplift-v2.1.csv"
# Data schema
FEATURE_COLUMNS = [f"f{i}" for i in range(12)]
TREATMENT_COLUMN = "treatment"
LABEL_COLUMN = "visit"
EXPERIMENT_NAME = "aisample-upliftmodelling" # MLflow experiment name
Importér biblioteker
Før du behandler, skal du importere de påkrævede Spark- og SynapseML-biblioteker. Du skal også importere et datavisualiseringsbibliotek, f.eks. Seaborn, et Python-datavisualiseringsbibliotek. Et datavisualiseringsbibliotek indeholder en grænseflade på højt niveau til oprettelse af visuelle ressourcer på DataFrames og matrixer. Få mere at vide om Spark, SynapseMLog Seaborn.
import os
import gzip
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.style as style
import seaborn as sns
%matplotlib inline
from synapse.ml.featurize import Featurize
from synapse.ml.core.spark import FluentAPI
from synapse.ml.lightgbm import *
from synapse.ml.train import ComputeModelStatistics
import mlflow
Download et datasæt, og upload til lakehouse
Denne kode downloader en offentligt tilgængelig version af datasættet og gemmer derefter denne dataressource i et Fabric lakehouse.
Vigtig
Sørg for, at du Føj et lakehouse- til notesbogen, før du kører den. Hvis du ikke gør det, medfører det en fejl.
if not IS_CUSTOM_DATA:
# Download demo data files into lakehouse if not exist
import os, requests
remote_url = "http://go.criteo.net/criteo-research-uplift-v2.1.csv.gz"
download_file = "criteo-research-uplift-v2.1.csv.gz"
download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError("Default lakehouse not found, please add a lakehouse and restart the session.")
os.makedirs(download_path, exist_ok=True)
if not os.path.exists(f"{download_path}/{DATA_FILE}"):
r = requests.get(f"{remote_url}", timeout=30)
with open(f"{download_path}/{download_file}", "wb") as f:
f.write(r.content)
with gzip.open(f"{download_path}/{download_file}", "rb") as fin:
with open(f"{download_path}/{DATA_FILE}", "wb") as fout:
fout.write(fin.read())
print("Downloaded demo data files into lakehouse.")
Begynd at optage kørselstidspunktet for denne notesbog.
# Record the notebook running time
import time
ts = time.time()
Konfigurer sporing af MLflow-eksperiment
Hvis du vil udvide MLflow-logføringsfunktionerne, registrerer automatisk logføring automatisk værdierne for inputparametre og outputmetrik for en model til maskinel indlæring under oplæringen. Disse oplysninger logføres derefter i arbejdsområdet, hvor MLflow-API'erne eller det tilsvarende eksperiment i arbejdsområdet kan få adgang til og visualisere dem. Besøg denne ressource for at få flere oplysninger om automatisk logning.
# Set up the MLflow experiment
import mlflow
mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True) # Disable MLflow autologging
Seddel
Hvis du vil deaktivere automatisk logning af Microsoft Fabric i en notesbogsession, skal du kalde mlflow.autolog()
og angive disable=True
.
Læs data fra lakehouse
Læs rådata fra lakehouse-afsnittet Filer, og tilføj flere kolonner til forskellige datodele. De samme oplysninger bruges til at oprette en partitioneret deltatabel.
raw_df = spark.read.csv(f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True).cache()
Trin 2: Udforskning af dataanalyse
Brug kommandoen display
til at få vist statistikker på højt niveau om datasættet. Du kan også få vist diagramvisninger for nemt at visualisere undersæt af datasættet.
display(raw_df.limit(20))
Undersøg procentdelen af de brugere, der besøger, procentdelen af brugere, der konverterer, og procentdelen af de besøgende, der konverterer.
raw_df.select(
F.mean("visit").alias("Percentage of users that visit"),
F.mean("conversion").alias("Percentage of users that convert"),
(F.sum("conversion") / F.sum("visit")).alias("Percentage of visitors that convert"),
).show()
Analysen viser, at 4,9% af brugere fra behandlingsgruppen – brugere, der modtog behandlingen eller reklamer – besøgte onlinebutikken. Kun 3,8% af brugere fra kontrolgruppen - brugere, der aldrig fik behandlingen, eller aldrig blev tilbudt eller udsat for reklamer - gjorde det samme. Derudover konverterede eller foretog 0,31% af alle brugere fra behandlingsgruppen et køb – mens kun 0,19% af brugere fra kontrolgruppen gjorde det. Derfor er konverteringsfrekvensen for besøgende, der foretog et køb, som også var medlem af behandlingsgruppen, 6,36%, sammenlignet med kun 5,07%** for brugere af kontrolgruppen. På baggrund af disse resultater kan behandlingen potentielt forbedre besøgsfrekvensen med ca. 1%og konverteringsfrekvensen for besøgende med ca. 1,3%. Behandlingen fører til en betydelig forbedring.
Trin 3: Definer modellen til oplæring
Forbered oplæringen, og test datasættene
Her kan du tilpasse en featurize-transformer til raw_df
DataFrame for at udtrække funktioner fra de angivne inputkolonner og sende disse funktioner til en ny kolonne med navnet features
.
Den resulterende DataFrame gemmes i en ny DataFrame med navnet df
.
transformer = Featurize().setOutputCol("features").setInputCols(FEATURE_COLUMNS).fit(raw_df)
df = transformer.transform(raw_df)
# Split the DataFrame into training and test sets, with a 80/20 ratio and a seed of 42
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# Print the training and test dataset sizes
print("Size of train dataset: %d" % train_df.count())
print("Size of test dataset: %d" % test_df.count())
# Group the training dataset by the treatment column, and count the number of occurrences of each value
train_df.groupby(TREATMENT_COLUMN).count().show()
Forbered behandling og kontrol af datasæt
Når du har oprettet trænings- og testdatasættene, skal du også danne behandlings- og kontroldatasæt for at oplære modellerne til maskinel indlæring for at måle opløftning.
# Extract the treatment and control DataFrames
treatment_train_df = train_df.where(f"{TREATMENT_COLUMN} > 0")
control_train_df = train_df.where(f"{TREATMENT_COLUMN} = 0")
Nu, hvor du har forberedt dine data, kan du fortsætte med at oplære en model med LightGBM.
Opløftning af modellering: T-Learner med LightGBM
Meta-learners er et sæt algoritmer, der er bygget oven på algoritmer til maskinel indlæring, f.eks. LightGBM, Xgboost osv. De hjælper med at estimere betinget gennemsnitlig behandlingseffekt eller CATE-. T-learner er en meta-learner, der ikke bruger en enkelt model. I stedet bruger T-learner én model pr. behandlingsvariabel. Derfor udvikles to modeller, og vi henviser til meta-learneren som T-learner. T-learner bruger flere modeller til maskinel indlæring til at løse problemet med helt at kassere behandlingen ved at tvinge den lærende til først at opdele den.
mlflow.autolog(exclusive=False)
classifier = (
LightGBMClassifier(dataTransferMode="bulk")
.setFeaturesCol("features") # Set the column name for features
.setNumLeaves(10) # Set the number of leaves in each decision tree
.setNumIterations(100) # Set the number of boosting iterations
.setObjective("binary") # Set the objective function for binary classification
.setLabelCol(LABEL_COLUMN) # Set the column name for the label
)
# Start a new MLflow run with the name "uplift"
active_run = mlflow.start_run(run_name="uplift")
# Start a new nested MLflow run with the name "treatment"
with mlflow.start_run(run_name="treatment", nested=True) as treatment_run:
treatment_run_id = treatment_run.info.run_id # Get the ID of the treatment run
treatment_model = classifier.fit(treatment_train_df) # Fit the classifier on the treatment training data
# Start a new nested MLflow run with the name "control"
with mlflow.start_run(run_name="control", nested=True) as control_run:
control_run_id = control_run.info.run_id # Get the ID of the control run
control_model = classifier.fit(control_train_df) # Fit the classifier on the control training data
Brug testdatasættet til en forudsigelse
Her kan du bruge treatment_model
og control_model
, der begge er defineret tidligere, til at transformere det test_df
testdatasæt. Derefter beregner du den forudsagte opløftning. Du definerer den forudsagte opløftning som forskellen mellem det forudsagte behandlingsresultat og det forudsagte kontrolresultat. Jo større denne forudsagte opløftning forskel, jo større effektiviteten af behandlingen (for eksempel reklamer) på en person eller en undergruppe.
getPred = F.udf(lambda v: float(v[1]), FloatType())
# Cache the resulting DataFrame for easier access
test_pred_df = (
test_df.mlTransform(treatment_model)
.withColumn("treatment_pred", getPred("probability"))
.drop("rawPrediction", "probability", "prediction")
.mlTransform(control_model)
.withColumn("control_pred", getPred("probability"))
.drop("rawPrediction", "probability", "prediction")
.withColumn("pred_uplift", F.col("treatment_pred") - F.col("control_pred"))
.select(TREATMENT_COLUMN, LABEL_COLUMN, "treatment_pred", "control_pred", "pred_uplift")
.cache()
)
# Display the first twenty rows of the resulting DataFrame
display(test_pred_df.limit(20))
Udfør modelevaluering
Da faktiske opløftning ikke kan overholdes for hver enkelt person, skal du måle opløftning over en gruppe af personer. Du bruger en Uplift Curve, der afbilder den reelle, akkumulerede opløftning på tværs af befolkningen.
X-aksen repræsenterer forholdet mellem den population, der er valgt til behandling. En værdi på 0 antyder ingen behandlingsgruppe - ingen udsættes for eller tilbydes behandlingen. En værdi på 1 tyder på en fuld behandling gruppe - alle er udsat for, eller tilbydes, behandlingen. Y-aksen viser opløftningsmålingen. Målet er at finde størrelsen af behandlingsgruppen eller den procentdel af befolkningen, der ville blive tilbudt eller eksponeret for behandlingen (f.eks. reklame). Denne fremgangsmåde optimerer målvalget for at optimere resultatet.
Først skal du rangere testens DataFrame-rækkefølge efter den forudsagte opløftning. Den forudsagte opløftning er forskellen mellem det forudsagte behandlingsresultat og det forudsagte kontrolresultat.
# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))
display(test_ranked_df.limit(20))
Derefter skal du beregne den akkumulerede procentdel af besøg i både behandlings- og kontrolgrupper.
# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()
# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
test_ranked_df.withColumn(
"control_label",
F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"treatment_label",
F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"control_cumsum",
F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
)
.withColumn(
"treatment_cumsum",
F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
)
)
# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))
Endelig skal du ved hver procentdel beregne gruppens opløftning som forskellen mellem den akkumulerede procentdel af besøg mellem behandlings- og kontrolgrupperne.
test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))
Afbild nu opløftningskurven for forudsigelsen af testdatasættet. Du skal konvertere PySpark DataFrame til en Pandas DataFrame, før du afbilder.
def uplift_plot(uplift_df):
"""
Plot the uplift curve
"""
gain_x = uplift_df.percent_rank
gain_y = uplift_df.group_uplift
# Plot the data
fig = plt.figure(figsize=(10, 6))
mpl.rcParams["font.size"] = 8
ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")
plt.plot(
[0, gain_x.max()],
[0, gain_y.max()],
"--",
color="tab:orange",
label="Random Treatment",
)
plt.legend()
plt.xlabel("Porportion Targeted")
plt.ylabel("Uplift")
plt.grid()
return fig, ax
test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)
mlflow.log_figure(fig, "UpliftCurve.png")
X-aksen repræsenterer forholdet mellem den population, der er valgt til behandling. En værdi på 0 antyder ingen behandlingsgruppe - ingen udsættes for eller tilbydes behandlingen. En værdi på 1 tyder på en fuld behandling gruppe - alle er udsat for, eller tilbydes, behandlingen. Y-aksen viser opløftningsmålingen. Målet er at finde størrelsen af behandlingsgruppen eller den procentdel af befolkningen, der ville blive tilbudt eller eksponeret for behandlingen (f.eks. reklame). Denne fremgangsmåde optimerer målvalget for at optimere resultatet.
Først skal du rangere testens DataFrame-rækkefølge efter den forudsagte opløftning. Den forudsagte opløftning er forskellen mellem det forudsagte behandlingsresultat og det forudsagte kontrolresultat.
# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))
display(test_ranked_df.limit(20))
Derefter skal du beregne den akkumulerede procentdel af besøg i både behandlings- og kontrolgrupper.
# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()
# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
test_ranked_df.withColumn(
"control_label",
F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"treatment_label",
F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"control_cumsum",
F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
)
.withColumn(
"treatment_cumsum",
F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
)
)
# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))
Endelig skal du ved hver procentdel beregne gruppens opløftning som forskellen mellem den akkumulerede procentdel af besøg mellem behandlings- og kontrolgrupperne.
test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))
Afbild nu opløftningskurven for forudsigelsen af testdatasættet. Du skal konvertere PySpark DataFrame til en Pandas DataFrame, før du afbilder.
def uplift_plot(uplift_df):
"""
Plot the uplift curve
"""
gain_x = uplift_df.percent_rank
gain_y = uplift_df.group_uplift
# Plot the data
fig = plt.figure(figsize=(10, 6))
mpl.rcParams["font.size"] = 8
ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")
plt.plot(
[0, gain_x.max()],
[0, gain_y.max()],
"--",
color="tab:orange",
label="Random Treatment",
)
plt.legend()
plt.xlabel("Porportion Targeted")
plt.ylabel("Uplift")
plt.grid()
return fig, ax
test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)
mlflow.log_figure(fig, "UpliftCurve.png")
Analysen og opløftningskurven viser begge, at de øverste 20% befolkningstal, som rangeret efter forudsigelsen, ville have en stor gevinst, hvis de modtog behandlingen. Det betyder, at de øverste 20% af befolkningen repræsenterer den persuadables gruppe. Derfor kan du derefter angive afskæringsresultatet for den ønskede størrelse af behandlingsgruppen til 20%for at identificere de kunder, der udvælger målet, for den største effekt.
cutoff_percentage = 0.2
cutoff_score = test_ranked_pd_df.iloc[int(len(test_ranked_pd_df) * cutoff_percentage)][
"pred_uplift"
]
print("Uplift scores that exceed {:.4f} map to Persuadables.".format(cutoff_score))
mlflow.log_metrics(
{"cutoff_score": cutoff_score, "cutoff_percentage": cutoff_percentage}
)
Trin 4: Registrer den endelige ML-model
Du bruger MLflow til at spore og logføre alle eksperimenter for både behandlings- og kontrolgrupper. Denne sporing og logføring omfatter de tilsvarende parametre, målepunkter og modellerne. Disse oplysninger logføres under eksperimentnavnet i arbejdsområdet til senere brug.
# Register the model
treatment_model_uri = "runs:/{}/model".format(treatment_run_id)
mlflow.register_model(treatment_model_uri, f"{EXPERIMENT_NAME}-treatmentmodel")
control_model_uri = "runs:/{}/model".format(control_run_id)
mlflow.register_model(control_model_uri, f"{EXPERIMENT_NAME}-controlmodel")
mlflow.end_run()
Sådan får du vist dine eksperimenter:
- Vælg dit arbejdsområde i venstre panel.
- Find og vælg eksperimentnavnet, i dette tilfælde aisample-upliftmodelling.
Trin 5: Gem forudsigelsesresultaterne
Microsoft Fabric tilbyder PREDICT – en skalerbar funktion, der understøtter batchscore i ethvert beregningsprogram. Det giver kunderne mulighed for at anvende modeller til maskinel indlæring. Brugerne kan oprette batchforudsigelser direkte fra en notesbog eller elementsiden for en bestemt model. Besøg denne ressource for at få mere at vide om PREDICT og for at få mere at vide om, hvordan du bruger PREDICT i Microsoft Fabric.
# Load the model back
loaded_treatmentmodel = mlflow.spark.load_model(treatment_model_uri, dfs_tmpdir="Files/spark")
loaded_controlmodel = mlflow.spark.load_model(control_model_uri, dfs_tmpdir="Files/spark")
# Make predictions
batch_predictions_treatment = loaded_treatmentmodel.transform(test_df)
batch_predictions_control = loaded_controlmodel.transform(test_df)
batch_predictions_treatment.show(5)
# Save the predictions in the lakehouse
batch_predictions_treatment.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/batch_predictions_treatment"
)
batch_predictions_control.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/batch_predictions_control"
)
# Determine the entire runtime
print(f"Full run cost {int(time.time() - ts)} seconds.")