Opplæring: Opprette, lære opp og evaluere en opphevingsmodell
Denne opplæringen presenterer et ende-til-ende-eksempel på en Synapse Data Science-arbeidsflyt i Microsoft Fabric. Du lærer hvordan du oppretter, lærer opp og evaluerer oppløftende modeller og bruker oppløftende modelleringsteknikker.
Forutsetninger
Få et Microsoft Fabric-abonnement. Eller registrer deg for en gratis Prøveversjon av Microsoft Fabric.
Logg på Microsoft Fabric.
Bruk opplevelsesbryteren nederst til venstre på hjemmesiden for å bytte til Fabric.
- Kjennskap til Microsoft Fabric-notatblokker
- Et lakehouse for denne notatblokken for å lagre data for dette eksemplet. Hvis du vil ha mer informasjon, kan du gå til Legge til et lakehouse i notatblokken
Følg med i en notatblokk
Du kan følge med i en notatblokk på én av to måter:
- Åpne og kjør den innebygde notatblokken.
- Last opp notatblokken fra GitHub.
Åpne den innebygde notatblokken
Eksemplet på Opphevingsmodellering notatblokk følger med denne opplæringen.
Hvis du vil åpne eksempelnotatblokken for denne opplæringen, følger du instruksjonene i Klargjør systemet for.
Pass på å feste et lakehouse til notatblokken før du begynner å kjøre kode.
Importere notatblokken fra GitHub
Den AIsample - Uplift Modeling.ipynb notatblokken følger med denne opplæringen.
Hvis du vil åpne den medfølgende notatblokken for denne opplæringen, følger du instruksjonene i Klargjør systemet for opplæringer i datavitenskap, for å importere notatblokken til arbeidsområdet.
Du kan opprette en ny notatblokk hvis du heller vil kopiere og lime inn koden fra denne siden.
Pass på å feste et lakehouse til notatblokken før du begynner å kjøre kode.
Trinn 1: Laste inn dataene
Dataset
Criteo AI Lab opprettet datasettet. Datasettet har 13 M rader. Hver rad representerer én bruker. Hver rad har 12 funksjoner, en behandlingsindikator og to binære etiketter som inkluderer besøk og konvertering.
- f0 – f11: funksjonsverdier (tette, flytende verdier)
- behandling: om en bruker var tilfeldig mål for behandling (for eksempel reklame) (1 = behandling, 0 = kontroll)
- konvertering: om det oppstod en konvertering (for eksempel et kjøp) for en bruker (binær, etikett)
- gå til: om det oppstod en konvertering (for eksempel kjøp) for en bruker (binær, etikett)
Sitat
- Hjemmeside for datasett: https://ailab.criteo.com/criteo-uplift-prediction-dataset/
Datasettet som brukes for denne notatblokken, krever dette BibTex-sitatet:
@inproceedings{Diemert2018,
author = {{Diemert Eustache, Betlei Artem} and Renaudin, Christophe and Massih-Reza, Amini},
title={A Large Scale Benchmark for Uplift Modeling},
publisher = {ACM},
booktitle = {Proceedings of the AdKDD and TargetAd Workshop, KDD, London,United Kingdom, August, 20, 2018},
year = {2018}
}
Tips
Ved å definere følgende parametere kan du enkelt bruke denne notatblokken på forskjellige datasett.
IS_CUSTOM_DATA = False # If True, the user must upload the dataset manually
DATA_FOLDER = "Files/uplift-modelling"
DATA_FILE = "criteo-research-uplift-v2.1.csv"
# Data schema
FEATURE_COLUMNS = [f"f{i}" for i in range(12)]
TREATMENT_COLUMN = "treatment"
LABEL_COLUMN = "visit"
EXPERIMENT_NAME = "aisample-upliftmodelling" # MLflow experiment name
Importere biblioteker
Før du behandler, må du importere nødvendige Spark- og SynapseML-biblioteker. Du må også importere et datavisualiseringsbibliotek – for eksempel Seaborn, et python-datavisualiseringsbibliotek. Et datavisualiseringsbibliotek gir et grensesnitt på høyt nivå for å bygge visuelle ressurser på DataFrames og matriser. Lær mer om Spark, SynapseMLog Seaborn.
import os
import gzip
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.style as style
import seaborn as sns
%matplotlib inline
from synapse.ml.featurize import Featurize
from synapse.ml.core.spark import FluentAPI
from synapse.ml.lightgbm import *
from synapse.ml.train import ComputeModelStatistics
import mlflow
Laste ned et datasett og laste opp til lakehouse
Denne koden laster ned en offentlig tilgjengelig versjon av datasettet, og lagrer deretter dataressursen i et Fabric Lakehouse.
Viktig
Kontroller at du Legg til et lakehouse- i notatblokken før du kjører den. Hvis du ikke gjør dette, vil det føre til en feil.
if not IS_CUSTOM_DATA:
# Download demo data files into lakehouse if not exist
import os, requests
remote_url = "http://go.criteo.net/criteo-research-uplift-v2.1.csv.gz"
download_file = "criteo-research-uplift-v2.1.csv.gz"
download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError("Default lakehouse not found, please add a lakehouse and restart the session.")
os.makedirs(download_path, exist_ok=True)
if not os.path.exists(f"{download_path}/{DATA_FILE}"):
r = requests.get(f"{remote_url}", timeout=30)
with open(f"{download_path}/{download_file}", "wb") as f:
f.write(r.content)
with gzip.open(f"{download_path}/{download_file}", "rb") as fin:
with open(f"{download_path}/{DATA_FILE}", "wb") as fout:
fout.write(fin.read())
print("Downloaded demo data files into lakehouse.")
Begynn å spille inn kjøretiden for denne notatblokken.
# Record the notebook running time
import time
ts = time.time()
Konfigurere sporing av MLflow-eksperimenter
Hvis du vil utvide loggingsfunksjonene for MLflow, registreres verdiene for inndataparametere og utdatadata for en maskinlæringsmodell automatisk under opplæringen. Denne informasjonen logges deretter til arbeidsområdet, der MLflow-API-ene eller det tilsvarende eksperimentet i arbeidsområdet kan få tilgang til og visualisere det. Gå til denne ressursen for mer informasjon om autologging.
# Set up the MLflow experiment
import mlflow
mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True) # Disable MLflow autologging
Notat
Hvis du vil deaktivere Automatisk tillogging av Microsoft Fabric i en notatblokkøkt, ringer du mlflow.autolog()
og angir disable=True
.
Les data fra lakehouse
Les rådata fra lakehouse Files-delen og legg til flere kolonner for ulike datodeler. Den samme informasjonen brukes til å opprette en partisjonert deltatabell.
raw_df = spark.read.csv(f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True).cache()
Trinn 2: Utforskende dataanalyse
Bruk kommandoen display
til å vise statistikk på høyt nivå om datasettet. Du kan også vise diagramvisningene for enkelt å visualisere delsett av datasettet.
display(raw_df.limit(20))
Undersøk prosentandelen av brukerne som besøker, prosentandelen av brukere som konverterer, og prosentandelen av de besøkende som konverterer.
raw_df.select(
F.mean("visit").alias("Percentage of users that visit"),
F.mean("conversion").alias("Percentage of users that convert"),
(F.sum("conversion") / F.sum("visit")).alias("Percentage of visitors that convert"),
).show()
Analysen indikerer at 4,9% av brukere fra behandlingsgruppen - brukere som fikk behandling, eller reklame - besøkte nettbutikken. Bare 3,8% av brukere fra kontrollgruppen - brukere som aldri fikk behandlingen, eller aldri ble tilbudt eller utsatt for reklame - gjorde det samme. I tillegg konverterte 0,31% av alle brukere fra behandlingsgruppen, eller gjorde et kjøp - mens bare 0,19% av brukere fra kontrollgruppen gjorde det. Som et resultat er konverteringsfrekvensen for besøkende som gjorde et kjøp, som også var medlemmer av behandlingsgruppen, 6,36%, sammenlignet med bare 5,07%** for brukere av kontrollgruppen. Basert på disse resultatene kan behandlingen potensielt forbedre besøksraten med ca. 1%, og konverteringsfrekvensen for besøkende med ca. 1,3%. Behandlingen fører til en betydelig forbedring.
Trinn 3: Definere modellen for opplæring
Klargjøre opplæringen og teste datasettene
Her får du plass til en Featurize-transformator til raw_df
DataFrame, for å trekke ut funksjoner fra de angitte inndatakolonnene og sende disse funksjonene til en ny kolonne med navnet features
.
Den resulterende DataFrame lagres i en ny DataFrame med navnet df
.
transformer = Featurize().setOutputCol("features").setInputCols(FEATURE_COLUMNS).fit(raw_df)
df = transformer.transform(raw_df)
# Split the DataFrame into training and test sets, with a 80/20 ratio and a seed of 42
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# Print the training and test dataset sizes
print("Size of train dataset: %d" % train_df.count())
print("Size of test dataset: %d" % test_df.count())
# Group the training dataset by the treatment column, and count the number of occurrences of each value
train_df.groupby(TREATMENT_COLUMN).count().show()
Klargjøre behandlings- og kontrolldatasettene
Når du har opprettet opplærings- og testdatasettene, må du også danne datasettene for behandling og kontroll for å lære opp maskinlæringsmodellene for å måle økningen.
# Extract the treatment and control DataFrames
treatment_train_df = train_df.where(f"{TREATMENT_COLUMN} > 0")
control_train_df = train_df.where(f"{TREATMENT_COLUMN} = 0")
Nå som du har forberedt dataene, kan du fortsette å lære opp en modell med LightGBM.
Oppløftende modellering: T-Learner med LightGBM
Meta-elever er et sett med algoritmer, bygget på toppen av maskinlæringsalgoritmer som LightGBM, Xgboost og så videre. De bidrar til å estimere betinget gjennomsnittlig behandlingseffekt, eller CATE. T-eleven er en meta-elev som ikke bruker én enkelt modell. I stedet bruker T-eleven én modell per behandlingsvariabel. Derfor utvikles to modeller, og vi refererer til meta-eleven som T-elev. T-eleven bruker flere maskinlæringsmodeller for å overvinne problemet med å fullstendig forkaste behandlingen, ved å tvinge eleven til først å dele på den.
mlflow.autolog(exclusive=False)
classifier = (
LightGBMClassifier(dataTransferMode="bulk")
.setFeaturesCol("features") # Set the column name for features
.setNumLeaves(10) # Set the number of leaves in each decision tree
.setNumIterations(100) # Set the number of boosting iterations
.setObjective("binary") # Set the objective function for binary classification
.setLabelCol(LABEL_COLUMN) # Set the column name for the label
)
# Start a new MLflow run with the name "uplift"
active_run = mlflow.start_run(run_name="uplift")
# Start a new nested MLflow run with the name "treatment"
with mlflow.start_run(run_name="treatment", nested=True) as treatment_run:
treatment_run_id = treatment_run.info.run_id # Get the ID of the treatment run
treatment_model = classifier.fit(treatment_train_df) # Fit the classifier on the treatment training data
# Start a new nested MLflow run with the name "control"
with mlflow.start_run(run_name="control", nested=True) as control_run:
control_run_id = control_run.info.run_id # Get the ID of the control run
control_model = classifier.fit(control_train_df) # Fit the classifier on the control training data
Bruk testdatasettet for en prognose
Her bruker du treatment_model
og control_model
, begge definert tidligere, til å transformere test_df
testdatasettet. Deretter beregner du den forventede økningen. Du definerer den forventede økningen som forskjellen mellom det forventede behandlingsresultatet og det forventede kontrollresultatet. Jo større denne forventede økningsforskjellen er, jo større er effektiviteten av behandlingen (for eksempel reklame) på en person eller en undergruppe.
getPred = F.udf(lambda v: float(v[1]), FloatType())
# Cache the resulting DataFrame for easier access
test_pred_df = (
test_df.mlTransform(treatment_model)
.withColumn("treatment_pred", getPred("probability"))
.drop("rawPrediction", "probability", "prediction")
.mlTransform(control_model)
.withColumn("control_pred", getPred("probability"))
.drop("rawPrediction", "probability", "prediction")
.withColumn("pred_uplift", F.col("treatment_pred") - F.col("control_pred"))
.select(TREATMENT_COLUMN, LABEL_COLUMN, "treatment_pred", "control_pred", "pred_uplift")
.cache()
)
# Display the first twenty rows of the resulting DataFrame
display(test_pred_df.limit(20))
Utfør modellevaluering
Siden faktisk økning ikke kan observeres for hver enkelt person, må du måle økningen over en gruppe individer. Du bruker en oppløftende kurve som tegner inn den reelle, kumulative økningen i hele populasjonen.
X-aksen representerer forholdet mellom populasjonen som er valgt for behandlingen. En verdi på 0 antyder ingen behandlingsgruppe - ingen blir utsatt for, eller tilbys, behandlingen. En verdi på 1 antyder en full behandlingsgruppe - alle blir utsatt for, eller tilbys, behandlingen. Y-aksen viser hevingsmålet. Målet er å finne størrelsen på behandlingsgruppen, eller prosentandelen av befolkningen som vil bli tilbudt eller utsatt for behandlingen (for eksempel reklame). Denne tilnærmingen optimaliserer målvalget for å optimalisere resultatet.
Først rangerer du testen DataFrame-rekkefølgen etter den forventede økningen. Den forventede økningen er forskjellen mellom det forventede behandlingsresultatet og det forventede kontrollresultatet.
# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))
display(test_ranked_df.limit(20))
Deretter beregner du den kumulative prosentandelen av besøk i både behandlings- og kontrollgruppene.
# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()
# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
test_ranked_df.withColumn(
"control_label",
F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"treatment_label",
F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"control_cumsum",
F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
)
.withColumn(
"treatment_cumsum",
F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
)
)
# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))
Til slutt beregner du økningen i gruppen som forskjellen mellom den kumulative prosentandelen av besøk mellom behandlings- og kontrollgruppene.
test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))
Nå kan du tegne inn hevingskurven for testdatasettprognosen. Du må konvertere PySpark DataFrame til en Pandas DataFrame før du planlegger.
def uplift_plot(uplift_df):
"""
Plot the uplift curve
"""
gain_x = uplift_df.percent_rank
gain_y = uplift_df.group_uplift
# Plot the data
fig = plt.figure(figsize=(10, 6))
mpl.rcParams["font.size"] = 8
ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")
plt.plot(
[0, gain_x.max()],
[0, gain_y.max()],
"--",
color="tab:orange",
label="Random Treatment",
)
plt.legend()
plt.xlabel("Porportion Targeted")
plt.ylabel("Uplift")
plt.grid()
return fig, ax
test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)
mlflow.log_figure(fig, "UpliftCurve.png")
X-aksen representerer forholdet mellom populasjonen som er valgt for behandlingen. En verdi på 0 antyder ingen behandlingsgruppe - ingen blir utsatt for, eller tilbys, behandlingen. En verdi på 1 antyder en full behandlingsgruppe - alle blir utsatt for, eller tilbys, behandlingen. Y-aksen viser hevingsmålet. Målet er å finne størrelsen på behandlingsgruppen, eller prosentandelen av befolkningen som vil bli tilbudt eller utsatt for behandlingen (for eksempel reklame). Denne tilnærmingen optimaliserer målvalget for å optimalisere resultatet.
Først rangerer du testen DataFrame-rekkefølgen etter den forventede økningen. Den forventede økningen er forskjellen mellom det forventede behandlingsresultatet og det forventede kontrollresultatet.
# Compute the percentage rank of the predicted uplift values in descending order, and display the top twenty rows
test_ranked_df = test_pred_df.withColumn("percent_rank", F.percent_rank().over(Window.orderBy(F.desc("pred_uplift"))))
display(test_ranked_df.limit(20))
Deretter beregner du den kumulative prosentandelen av besøk i både behandlings- og kontrollgruppene.
# Calculate the number of control and treatment samples
C = test_ranked_df.where(f"{TREATMENT_COLUMN} == 0").count()
T = test_ranked_df.where(f"{TREATMENT_COLUMN} != 0").count()
# Add columns to the DataFrame to calculate the control and treatment cumulative sum
test_ranked_df = (
test_ranked_df.withColumn(
"control_label",
F.when(F.col(TREATMENT_COLUMN) == 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"treatment_label",
F.when(F.col(TREATMENT_COLUMN) != 0, F.col(LABEL_COLUMN)).otherwise(0),
)
.withColumn(
"control_cumsum",
F.sum("control_label").over(Window.orderBy("percent_rank")) / C,
)
.withColumn(
"treatment_cumsum",
F.sum("treatment_label").over(Window.orderBy("percent_rank")) / T,
)
)
# Display the first 20 rows of the dataframe
display(test_ranked_df.limit(20))
Til slutt beregner du økningen i gruppen som forskjellen mellom den kumulative prosentandelen av besøk mellom behandlings- og kontrollgruppene.
test_ranked_df = test_ranked_df.withColumn("group_uplift", F.col("treatment_cumsum") - F.col("control_cumsum")).cache()
display(test_ranked_df.limit(20))
Nå kan du tegne inn hevingskurven for testdatasettprognosen. Du må konvertere PySpark DataFrame til en Pandas DataFrame før du planlegger.
def uplift_plot(uplift_df):
"""
Plot the uplift curve
"""
gain_x = uplift_df.percent_rank
gain_y = uplift_df.group_uplift
# Plot the data
fig = plt.figure(figsize=(10, 6))
mpl.rcParams["font.size"] = 8
ax = plt.plot(gain_x, gain_y, color="#2077B4", label="Normalized Uplift Model")
plt.plot(
[0, gain_x.max()],
[0, gain_y.max()],
"--",
color="tab:orange",
label="Random Treatment",
)
plt.legend()
plt.xlabel("Porportion Targeted")
plt.ylabel("Uplift")
plt.grid()
return fig, ax
test_ranked_pd_df = test_ranked_df.select(["pred_uplift", "percent_rank", "group_uplift"]).toPandas()
fig, ax = uplift_plot(test_ranked_pd_df)
mlflow.log_figure(fig, "UpliftCurve.png")
Analysen og opphevingskurven viser begge at de 20 beste% befolkningen, som rangert etter prognosen, ville ha en stor gevinst hvis de fikk behandlingen. Dette betyr at de 20 beste% i populasjonen representerer den overbevisende gruppen. Derfor kan du deretter angi cutoff-poengsummen for ønsket størrelse på behandlingsgruppen på 20%, for å identifisere målvalgkundene for størst effekt.
cutoff_percentage = 0.2
cutoff_score = test_ranked_pd_df.iloc[int(len(test_ranked_pd_df) * cutoff_percentage)][
"pred_uplift"
]
print("Uplift scores that exceed {:.4f} map to Persuadables.".format(cutoff_score))
mlflow.log_metrics(
{"cutoff_score": cutoff_score, "cutoff_percentage": cutoff_percentage}
)
Trinn 4: Registrere den endelige ML-modellen
Du bruker MLflow til å spore og logge alle eksperimenter for både behandlings- og kontrollgrupper. Denne sporingen og loggingen inkluderer tilsvarende parametere, måledata og modellene. Denne informasjonen logges under eksperimentnavnet, i arbeidsområdet, for senere bruk.
# Register the model
treatment_model_uri = "runs:/{}/model".format(treatment_run_id)
mlflow.register_model(treatment_model_uri, f"{EXPERIMENT_NAME}-treatmentmodel")
control_model_uri = "runs:/{}/model".format(control_run_id)
mlflow.register_model(control_model_uri, f"{EXPERIMENT_NAME}-controlmodel")
mlflow.end_run()
Slik viser du eksperimentene dine:
- Velg arbeidsområdet i venstre panel.
- Finn og velg eksperimentnavnet, i dette tilfellet aisample-upliftmodelling.
Trinn 5: Lagre prognoseresultatene
Microsoft Fabric tilbyr PREDICT – en skalerbar funksjon som støtter satsvis poengsum i enhver databehandlingsmotor. Det gjør det mulig for kunder å operasjonalisere maskinlæringsmodeller. Brukere kan opprette satsvise prognoser direkte fra en notatblokk eller elementsiden for en bestemt modell. Gå til denne ressursen for å lære mer om FORUTSI, og lær hvordan du bruker PREDICT i Microsoft Fabric.
# Load the model back
loaded_treatmentmodel = mlflow.spark.load_model(treatment_model_uri, dfs_tmpdir="Files/spark")
loaded_controlmodel = mlflow.spark.load_model(control_model_uri, dfs_tmpdir="Files/spark")
# Make predictions
batch_predictions_treatment = loaded_treatmentmodel.transform(test_df)
batch_predictions_control = loaded_controlmodel.transform(test_df)
batch_predictions_treatment.show(5)
# Save the predictions in the lakehouse
batch_predictions_treatment.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/batch_predictions_treatment"
)
batch_predictions_control.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/batch_predictions_control"
)
# Determine the entire runtime
print(f"Full run cost {int(time.time() - ts)} seconds.")