Freigeben über


Tutorial: Erstellen, Auswerten und Bereitstellen eines Empfehlungssystems

Dieses Tutorial stellt ein umfassendes Beispiel für einen Synapse Data Science-Workflow in Microsoft Fabric vor. Das Szenario erstellt ein Modell für Onlinebuchempfehlungen.

Dieses Tutorial umfasst die folgenden Schritte:

  • Hochladen der Daten in ein Lakehouse
  • Durchführen einer explorativen Analyse für die Daten
  • Trainieren eines Modells und Protokollieren des Modells mit MLflow
  • Laden des Modells und Treffen von Vorhersagen

Es stehen uns viele Arten von Empfehlungsalgorithmen zur Verfügung. In diesem Tutorial werden alternierende kleinste Quadrate (ALS) als Algorithmus zur Matrixfaktorisierung verwendet. ALS ist ein modellbasierter kollaborativer Filteralgorithmus.

Screenshot showing a chart of recommendation algorithms types.

Der Mindestquadratalgorithmus versucht, die Bewertungsmatrix R als Produkt von zwei Matrizen mit niedrigerer Rangfolge, U und V, zu schätzen. Hierbei ist R = U * Vt. In der Regel werden diese Näherungen als Faktormatrizen bezeichnet.

Der ALS-Algorithmus ist iterativ. Bei jeder Iteration wird eine der Faktormatrizen konstant gehalten, während die andere nach der Methode der kleinsten Quadrate gelöst wird. Dann wird die neu gelöste Faktormatrix konstant gehalten, während die andere Faktormatrix gelöst wird.

Screenshot of two side-by-side factor matrices.

Voraussetzungen

Notebook für das Tutorial

Für das Notebook können Sie eine der folgenden Optionen wählen:

  • Öffnen Sie das integrierte Notebook in der Synapse Data Science-Umgebung, und führen Sie es aus
  • Laden Sie Ihr Notebook von GitHub in die Synapse Data Science-Umgebung hoch

Öffnen des integrierten Notebooks

Das Beispiel Buchempfehlung ist das Notebook, das dieses Tutorial begleitet.

So öffnen Sie das integrierte Beispiel-Notebook für das Tutorial in der Synapse Data Science-Umgebung:

  1. Wechseln Sie zur Synapse Data Science-Startseite.

  2. Wählen Sie Beispiel verwenden aus.

  3. Wählen Sie das zugehörige Beispiel aus:

    • Wählen Sie es auf der Standardregisterkarte End-to-End-Workflows (Python) aus, wenn es sich bei dem Beispiel um ein Python-Tutorial handelt.
    • Wählen Sie es auf der Registerkarte End-to-End-Workflows (R) aus, wenn es sich bei dem Beispiel um ein R-Tutorial handelt.
    • Wählen Sie es auf der Registerkarte Schnelltutorials aus, wenn es sich bei dem Beispiel um ein Schnelltutorial handelt.
  4. Fügen Sie ein Lakehouse an das Notebook an, bevor Sie mit der Ausführung von Code beginnen.

Importieren des Notebooks von GitHub

AIsample – Book Recommendation.ipynb ist das Notebook, das dieses Tutorial begleitet.

Befolgen Sie zum Öffnen des zugehörigen Notebooks für dieses Tutorial die Anweisungen unter Vorbereiten Ihres Systems für Data Science-Tutorials zum Importieren des Notebooks in Ihren Arbeitsbereich.

Wenn Sie den Code lieber von dieser Seite kopieren und einfügen möchten, können Sie auch ein neues Notebook erstellen.

Fügen Sie unbedingt ein Lakehouse an das Notebook an, bevor Sie mit der Ausführung von Code beginnen.

Schritt 1. Laden der Daten

Das Buchempfehlungsdataset besteht in diesem Szenario aus drei separaten Datasets:

Durch Definieren dieser Parameter können Sie dieses Notebook auf verschiedene Datasets anwenden:

IS_CUSTOM_DATA = False  # If True, the dataset has to be uploaded manually

USER_ID_COL = "User-ID"  # Must not be '_user_id' for this notebook to run successfully
ITEM_ID_COL = "ISBN"  # Must not be '_item_id' for this notebook to run successfully
ITEM_INFO_COL = (
    "Book-Title"  # Must not be '_item_info' for this notebook to run successfully
)
RATING_COL = (
    "Book-Rating"  # Must not be '_rating' for this notebook to run successfully
)
IS_SAMPLE = True  # If True, use only <SAMPLE_ROWS> rows of data for training; otherwise, use all data
SAMPLE_ROWS = 5000  # If IS_SAMPLE is True, use only this number of rows for training

DATA_FOLDER = "Files/book-recommendation/"  # Folder that contains the datasets
ITEMS_FILE = "Books.csv"  # File that contains the item information
USERS_FILE = "Users.csv"  # File that contains the user information
RATINGS_FILE = "Ratings.csv"  # File that contains the rating information

EXPERIMENT_NAME = "aisample-recommendation"  # MLflow experiment name

Herunterladen und Speichern von Daten in einem Lakehouse

Dieser Code lädt das Dataset herunter und speichert es dann im Lakehouse.

Wichtig

Denken Sie daran, ein Lakehouse zum Notebook hinzuzufügen, bevor Sie es ausführen. Andernfalls wird eine Fehlermeldung angezeigt.

if not IS_CUSTOM_DATA:
    # Download data files into a lakehouse if they don't exist
    import os, requests

    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/Book-Recommendation-Dataset"
    file_list = ["Books.csv", "Ratings.csv", "Users.csv"]
    download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError(
            "Default lakehouse not found, please add a lakehouse and restart the session."
        )
    os.makedirs(download_path, exist_ok=True)
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    print("Downloaded demo data files into lakehouse.")

Einrichten der MLflow-Experimentnachverfolgung

Verwenden Sie diesen Code, um die MLflow-Experimentnachverfolgung einzurichten. In diesem Beispiel wird die automatische Protokollierung deaktiviert. Weitere Informationen finden Sie im Artikel Automatische Protokollierung in Microsoft Fabric.

# Set up MLflow for experiment tracking
import mlflow

mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True)  # Disable MLflow autologging

Lesen von Daten aus dem Lakehouse

Wenn die richtigen Daten im Lakehouse gespeichert wurden, können Sie die drei Datasets in separate Spark-DataFrames im Notebook lesen. Die Dateipfade in diesem Code verwenden die zuvor definierten Parameter.

df_items = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv(f"{DATA_FOLDER}/raw/{ITEMS_FILE}")
    .cache()
)

df_ratings = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv(f"{DATA_FOLDER}/raw/{RATINGS_FILE}")
    .cache()
)

df_users = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv(f"{DATA_FOLDER}/raw/{USERS_FILE}")
    .cache()
)

Schritt 2: Durchführen einer explorativen Datenanalyse

Anzeigen der Rohdaten

Erkunden Sie die DataFrames mit dem Befehl display. Mit diesem Befehl können Sie DataFrame-Statistiken auf hoher Ebene anzeigen und verstehen, wie verschiedenen Spalten in den Datasets miteinander in Beziehung stehen. Bevor Sie die Datasets erkunden, müssen Sie mit diesem Code die erforderlichen Bibliotheken importieren:

import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer
import matplotlib.pyplot as plt
import seaborn as sns
color = sns.color_palette()  # Adjusting plotting style
import pandas as pd  # DataFrames

Verwenden Sie diesen Code, um den DataFrame zu sehen, der die Buchdaten enthält:

display(df_items, summary=True)

Fügen Sie eine _item_id-Spalte zum späteren Gebrauch hinzu. Der Wert _item_id muss für Empfehlungsmodelle eine ganze Zahl sein. Dieser Code verwendet StringIndexer zum Umwandeln von ITEM_ID_COL in Indizes:

df_items = (
    StringIndexer(inputCol=ITEM_ID_COL, outputCol="_item_id")
    .setHandleInvalid("skip")
    .fit(df_items)
    .transform(df_items)
    .withColumn("_item_id", F.col("_item_id").cast("int"))
)

Zeigen Sie den DataFrame an, und überprüfen Sie, ob der Wert _item_id wie erwartet monoton und nacheinander erweitert wird:

display(df_items.sort(F.col("_item_id").desc()))

Verwenden Sie diesen Code, um die führenden 10 Autoren nach Anzahl der geschriebenen Bücher in absteigender Reihenfolge zu zeichnen. Agatha Christie ist die führende Autorin mit mehr als 600 Büchern, gefolgt von William Shakespeare.

df_books = df_items.toPandas() # Create a pandas DataFrame from the Spark DataFrame for visualization
plt.figure(figsize=(8,5))
sns.countplot(y="Book-Author",palette = 'Paired', data=df_books,order=df_books['Book-Author'].value_counts().index[0:10])
plt.title("Top 10 authors with maximum number of books")

Screenshot showing a graph of the top 10 authors who wrote the highest number of books.

Zeigen Sie als Nächstes den DataFrame an, der die Benutzerdaten enthält:

display(df_users, summary=True)

Wenn in einer Zeile ein User-ID-Wert fehlt, lassen Sie diese Zeile wegfallen. Fehlende Werte in einem benutzerdefinierten Dataset verursachen keine Probleme.

df_users = df_users.dropna(subset=(USER_ID_COL))
display(df_users, summary=True)

Fügen Sie eine _user_id-Spalte zum späteren Gebrauch hinzu. Für Empfehlungsmodelle muss der Wert _user_id eine ganze Zahl sein. Das folgende Codebeispiel verwendet StringIndexer zum Umwandeln von USER_ID_COL in Indizes.

Das Buch-Dataset hat bereits eine ganzzahlige User-ID-Spalte. Das Hinzufügen einer _user_id-Spalte aus Gründen der Kompatibilität verschiedenen Datasets macht dieses Beispiel jedoch robuster. Verwenden Sie diesen Code zum Hinzufügen der _user_id-Spalte:

df_users = (
    StringIndexer(inputCol=USER_ID_COL, outputCol="_user_id")
    .setHandleInvalid("skip")
    .fit(df_users)
    .transform(df_users)
    .withColumn("_user_id", F.col("_user_id").cast("int"))
)
display(df_users.sort(F.col("_user_id").desc()))

Verwenden Sie diesen Code zum Anzeigen der Bewertungsdaten:

display(df_ratings, summary=True)

Rufen Sie die unterschiedlichen Bewertungen ab und speichern Sie sie zur späteren Verwendung in einer Liste namens ratings:

ratings = [i[0] for i in df_ratings.select(RATING_COL).distinct().collect()]
print(ratings)

Verwenden Sie diesen Code zum Anzeigen der führenden 10 Bücher mit den höchsten Bewertungen:

plt.figure(figsize=(8,5))
sns.countplot(y="Book-Title",palette = 'Paired',data= df_books, order=df_books['Book-Title'].value_counts().index[0:10])
plt.title("Top 10 books per number of ratings")

Den Bewertungen zufolge ist Selected Poems das beliebteste Buch. Adventures of Huckleberry Finn, The Secret Garden und Dracula haben die gleiche Bewertung.

Screenshot showing a graph of the top-rated books.

Daten zusammenführen

Führen Sie die drei Datenframes zu einem Dataframe zusammen, um eine umfassendere Analyse zu ermöglichen:

df_all = df_ratings.join(df_users, USER_ID_COL, "inner").join(
    df_items, ITEM_ID_COL, "inner"
)
df_all_columns = [
    c for c in df_all.columns if c not in ["_user_id", "_item_id", RATING_COL]
]

# Reorder the columns to ensure that _user_id, _item_id, and Book-Rating are the first three columns
df_all = (
    df_all.select(["_user_id", "_item_id", RATING_COL] + df_all_columns)
    .withColumn("id", F.monotonically_increasing_id())
    .cache()
)

display(df_all)

Verwenden Sie diesen Code, um die Zahl der verschiedenen Benutzer, Bücher und Interaktionen anzuzeigen:

print(f"Total Users: {df_users.select('_user_id').distinct().count()}")
print(f"Total Items: {df_items.select('_item_id').distinct().count()}")
print(f"Total User-Item Interactions: {df_all.count()}")

Verwenden Sie diesen Code, um die 10 beliebtesten Bücher zu berechnen und anzuzeigen:

# Compute top popular products
df_top_items = (
    df_all.groupby(["_item_id"])
    .count()
    .join(df_items, "_item_id", "inner")
    .sort(["count"], ascending=[0])
)

# Find top <topn> popular items
topn = 10
pd_top_items = df_top_items.limit(topn).toPandas()
pd_top_items.head(10)

Tipp

Verwenden sie den <topn>-Wert für die Empfehlungsabschnitte Beliebt oder Meistgekauft.

# Plot top <topn> items
f, ax = plt.subplots(figsize=(10, 5))
plt.xticks(rotation="vertical")
sns.barplot(y=ITEM_INFO_COL, x="count", data=pd_top_items)
ax.tick_params(axis='x', rotation=45)
plt.xlabel("Number of Ratings for the Item")
plt.show()

Screenshot of a graph of the most popular books.

Erstellen von Trainings- und Testdatensätzen

Die ALS-Matrix erfordert eine gewisse Datenvorbereitung vor dem Training. Verwenden Sie dieses Codebeispiel zum Vorbereiten der Daten. Das Skript führt folgende Aktionen aus:

  • Umwandlung der Bewertungsspalte in den richtigen Typ
  • Beispiele für die Trainingsdaten mit Benutzerbewertungen
  • Unterteilen der Daten in Trainings- und Testdatasets
if IS_SAMPLE:
    # Must sort by '_user_id' before performing limit to ensure that ALS works normally
    # If training and test datasets have no common _user_id, ALS will fail
    df_all = df_all.sort("_user_id").limit(SAMPLE_ROWS)

# Cast the column into the correct type
df_all = df_all.withColumn(RATING_COL, F.col(RATING_COL).cast("float"))

# Using a fraction between 0 and 1 returns the approximate size of the dataset; for example, 0.8 means 80% of the dataset
# Rating = 0 means the user didn't rate the item, so it can't be used for training
# We use the 80% of the dataset with rating > 0 as the training dataset
fractions_train = {0: 0}
fractions_test = {0: 0}
for i in ratings:
    if i == 0:
        continue
    fractions_train[i] = 0.8
    fractions_test[i] = 1
# Training dataset
train = df_all.sampleBy(RATING_COL, fractions=fractions_train)

# Join with leftanti will select all rows from df_all with rating > 0 and not in the training dataset; for example, the remaining 20% of the dataset
# test dataset
test = df_all.join(train, on="id", how="leftanti").sampleBy(
    RATING_COL, fractions=fractions_test
)

Geringe Datendichte bezieht sich auf spärliche Feedbackdaten, aus denen keine Ähnlichkeiten in den Interessen der Benutzer hervorgehen. Verwenden Sie diesen Code, um die geringe Datendichte des Datasets zu berechnen, damit Sie die Daten und das Problem besser verstehen:

# Compute the sparsity of the dataset
def get_mat_sparsity(ratings):
    # Count the total number of ratings in the dataset - used as numerator
    count_nonzero = ratings.select(RATING_COL).count()
    print(f"Number of rows: {count_nonzero}")

    # Count the total number of distinct user_id and distinct product_id - used as denominator
    total_elements = (
        ratings.select("_user_id").distinct().count()
        * ratings.select("_item_id").distinct().count()
    )

    # Calculate the sparsity by dividing the numerator by the denominator
    sparsity = (1.0 - (count_nonzero * 1.0) / total_elements) * 100
    print("The ratings DataFrame is ", "%.4f" % sparsity + "% sparse.")

get_mat_sparsity(df_all)
# Check the ID range
# ALS supports only values in the integer range
print(f"max user_id: {df_all.agg({'_user_id': 'max'}).collect()[0][0]}")
print(f"max user_id: {df_all.agg({'_item_id': 'max'}).collect()[0][0]}")

Schritt 3: Entwickeln und Trainieren des Modells

Trainieren Sie ein ALS-Modell, das personalisierte Empfehlungen für Benutzer abgibt.

Definieren des Modells

Spark ML bietet eine praktische API für die Erstellung des ALS-Modells. Das Modell ist jedoch nicht zuverlässig in der Behandlung von Problemen wie geringe Datendichte und Kaltstart (Empfehlungen bei neuen Benutzern oder Artikeln). Um die Leistung des Modells zu verbessern, können Sie die Kreuzvalidierung und automatische Hyperparameteroptimierung miteinander kombinieren.

Verwenden Sie diesen Code, um Bibliotheken zu importieren, die für das Trainieren und Auswerten des Modells erforderlich sind:

# Import Spark required libraries
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit

# Specify the training parameters
num_epochs = 1  # Number of epochs; here we use 1 to reduce the training time
rank_size_list = [64]  # The values of rank in ALS for tuning
reg_param_list = [0.01, 0.1]  # The values of regParam in ALS for tuning
model_tuning_method = "TrainValidationSplit"  # TrainValidationSplit or CrossValidator
# Build the recommendation model by using ALS on the training data
# We set the cold start strategy to 'drop' to ensure that we don't get NaN evaluation metrics
als = ALS(
    maxIter=num_epochs,
    userCol="_user_id",
    itemCol="_item_id",
    ratingCol=RATING_COL,
    coldStartStrategy="drop",
    implicitPrefs=False,
    nonnegative=True,
)

Optimieren von Modellhyperparametern

Das nächste Codebeispiel erstellt ein Parameterraster, um die Suche über die Hyperparameter zu erleichtern. Der Code erstellt auch eine Regressionsauswertung mit der mittleren quadratischen Gesamtabweichung (RMSE) als Auswertungsmetrik:

#  Construct a grid search to select the best values for the training parameters
param_grid = (
    ParamGridBuilder()
    .addGrid(als.rank, rank_size_list)
    .addGrid(als.regParam, reg_param_list)
    .build()
)

print("Number of models to be tested: ", len(param_grid))

# Define the evaluator and set the loss function to the RMSE 
evaluator = RegressionEvaluator(
    metricName="rmse", labelCol=RATING_COL, predictionCol="prediction"
)

Das nächste Codebeispiel initiiert verschiedene Methoden zur Modelloptimierung basierend auf den vorkonfigurierten Parametern. Weitere Informationen zur Modelloptimierung finden Sie unter Modelloptimierung: Modellauswahl und Hyperparameteroptimierung auf der Apache Spark-Website.

# Build cross-validation by using CrossValidator and TrainValidationSplit
if model_tuning_method == "CrossValidator":
    tuner = CrossValidator(
        estimator=als,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        numFolds=5,
        collectSubModels=True,
    )
elif model_tuning_method == "TrainValidationSplit":
    tuner = TrainValidationSplit(
        estimator=als,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        # 80% of the training data will be used for training; 20% for validation
        trainRatio=0.8,
        collectSubModels=True,
    )
else:
    raise ValueError(f"Unknown model_tuning_method: {model_tuning_method}")

Evaluieren des Modells

Sie sollten Module anhand der Testdaten auswerten. Ein gut trainiertes Modell sollte über hohe Metriken für das Dataset verfügen.

Bei einem überangepassten Modell müssen Sie gegebenenfalls die Größe der Trainingsdaten erhöhen oder einige der redundanten Features reduzieren. Es ist gegebenenfalls eine Änderung der Modellarchitektur oder eine gewisse Feinabstimmung der Parameter erforderlich.

Hinweis

Ein negativer Wert der R-Quadrat-Metrik zeigt an, dass das trainierte Modell schlechter abschneidet als eine horizontale gerade Linie. Dieses Ergebnis deutet darauf hin, dass das trainierte Modell die Daten nicht erklärt.

Verwenden Sie diesen Code zum Definieren einer Auswertungsfunktion:

def evaluate(model, data, verbose=0):
    """
    Evaluate the model by computing rmse, mae, r2, and variance over the data.
    """

    predictions = model.transform(data).withColumn(
        "prediction", F.col("prediction").cast("double")
    )

    if verbose > 1:
        # Show 10 predictions
        predictions.select("_user_id", "_item_id", RATING_COL, "prediction").limit(
            10
        ).show()

    # Initialize the regression evaluator
    evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=RATING_COL)

    _evaluator = lambda metric: evaluator.setMetricName(metric).evaluate(predictions)
    rmse = _evaluator("rmse")
    mae = _evaluator("mae")
    r2 = _evaluator("r2")
    var = _evaluator("var")

    if verbose > 0:
        print(f"RMSE score = {rmse}")
        print(f"MAE score = {mae}")
        print(f"R2 score = {r2}")
        print(f"Explained variance = {var}")

    return predictions, (rmse, mae, r2, var)

Nachverfolgen des Experiments mit MLflow

Verwenden Sie MLflow, um alle Experimente zu verfolgen und Parameter, Metriken und Modelle zu protokollieren. Verwenden Sie diesen Code zum Starten von Modelltraining und Auswertung:

from mlflow.models.signature import infer_signature

with mlflow.start_run(run_name="als"):
    # Train models
    models = tuner.fit(train)
    best_metrics = {"RMSE": 10e6, "MAE": 10e6, "R2": 0, "Explained variance": 0}
    best_index = 0
    # Evaluate models
    # Log models, metrics, and parameters
    for idx, model in enumerate(models.subModels):
        with mlflow.start_run(nested=True, run_name=f"als_{idx}") as run:
            print("\nEvaluating on test data:")
            print(f"subModel No. {idx + 1}")
            predictions, (rmse, mae, r2, var) = evaluate(model, test, verbose=1)

            signature = infer_signature(
                train.select(["_user_id", "_item_id"]),
                predictions.select(["_user_id", "_item_id", "prediction"]),
            )
            print("log model:")
            mlflow.spark.log_model(
                model,
                f"{EXPERIMENT_NAME}-alsmodel",
                signature=signature,
                registered_model_name=f"{EXPERIMENT_NAME}-alsmodel",
                dfs_tmpdir="Files/spark",
            )
            print("log metrics:")
            current_metric = {
                "RMSE": rmse,
                "MAE": mae,
                "R2": r2,
                "Explained variance": var,
            }
            mlflow.log_metrics(current_metric)
            if rmse < best_metrics["RMSE"]:
                best_metrics = current_metric
                best_index = idx

            print("log parameters:")
            mlflow.log_params(
                {
                    "subModel_idx": idx,
                    "num_epochs": num_epochs,
                    "rank_size_list": rank_size_list,
                    "reg_param_list": reg_param_list,
                    "model_tuning_method": model_tuning_method,
                    "DATA_FOLDER": DATA_FOLDER,
                }
            )
    # Log the best model and related metrics and parameters to the parent run
    mlflow.spark.log_model(
        models.subModels[best_index],
        f"{EXPERIMENT_NAME}-alsmodel",
        signature=signature,
        registered_model_name=f"{EXPERIMENT_NAME}-alsmodel",
        dfs_tmpdir="Files/spark",
    )
    mlflow.log_metrics(best_metrics)
    mlflow.log_params(
        {
            "subModel_idx": idx,
            "num_epochs": num_epochs,
            "rank_size_list": rank_size_list,
            "reg_param_list": reg_param_list,
            "model_tuning_method": model_tuning_method,
            "DATA_FOLDER": DATA_FOLDER,
        }
    )

Wählen Sie in Ihrem Arbeitsbereich das Experiment mit dem Namen aisample-recommendation aus, um die protokollierten Informationen für die Trainingsausführung anzuzeigen. Wenn Sie den Experimentnamen geändert haben, wählen Sie das Experiment mit dem neuen Namen aus. Die protokollierten Informationen sehen ähnlich aus wie in dieser Abbildung:

Screenshot of the experiment logs.

Schritt 4: Laden des endgültigen Modells zum Bewerten und Treffen von Vorhersagen

Wenn Sie das Modelltraining beendet und das beste Modell ausgewählt haben, laden Sie das Modell für die Bewertung (manchmal auch als Rückschlüsse bezeichnet). Dieser Code lädt das Modell und verwendet Vorhersagen, um die 10 besten Bücher für jeden Benutzer zu empfehlen:

# Load the best model
# MLflow uses PipelineModel to wrap the original model, so we extract the original ALSModel from the stages
model_uri = f"models:/{EXPERIMENT_NAME}-alsmodel/1"
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir="Files/spark").stages[-1]

# Generate top 10 book recommendations for each user
userRecs = loaded_model.recommendForAllUsers(10)

# Represent the recommendations in an interpretable format
userRecs = (
    userRecs.withColumn("rec_exp", F.explode("recommendations"))
    .select("_user_id", F.col("rec_exp._item_id"), F.col("rec_exp.rating"))
    .join(df_items.select(["_item_id", "Book-Title"]), on="_item_id")
)
userRecs.limit(10).show()

Die Ausgabe sieht ähnlich aus wie in dieser Tabelle:

_item_id _user_id rating Book-Title
44865 7 7.9996786 Lasher: Lives of ...
786 7 6.2255826 The Piano Man's D...
45330 7 4.980466 State of Mind
38960 7 4.980466 All He Ever Wanted
125415 7 4.505084 Harry Potter and ...
44939 7 4.3579073 Taltos: Lives of ...
175247 7 4.3579073 The Bonesetter's ...
170183 7 4.228735 Living the Simple...
88503 7 4.221206 Island of the Blu...
32894 7 3.9031885 Winter Solstice

Speichern der Vorhersagen im Lakehouse

Verwenden Sie diesen Code, um die Empfehlungen zurück in das Lakehouse zu schreiben:

# Code to save userRecs into the lakehouse
userRecs.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/userRecs"
)