Partager via


Tutoriel : créer, évaluer et noter un système de recommandation

Ce tutoriel présente un exemple de bout en bout d’un flux de travail science des données Synapse dans Microsoft Fabric. Le scénario crée un modèle pour les recommandations de livres en ligne.

Ce didacticiel couvre ces étapes :

  • Charger les données dans un lakehouse
  • Effectuer une analyse exploratoire des données
  • Effectuer l’apprentissage du modèle et le journaliser avec MLflow
  • Charger le modèle et effectuer des prédictions

Nous avons de nombreux types d’algorithmes de recommandation disponibles. Ce tutoriel utilise l’algorithme de factorisation de matrice ALS (Alter Least Squares). ALS est un algorithme de filtrage collaboratif basé sur un modèle.

Screenshot showing a chart of recommendation algorithms types.

ALS tente d’estimer la matrice d’évaluation R comme produit de deux matrices de rang inférieur, vous et V. Ici, R = U * Vt. En règle générale, ces approximations sont appelées matrices de facteurs.

L’algorithme ALS est itératif. Chaque itération maintient l’une des matrices factorielles constante, tout en résolvant l’autre à l’aide de la méthode des moindres carrés. Il maintient ensuite la matrice factorielle nouvellement résolue constante pendant qu’il résout l’autre matrice factorielle.

Screenshot of two side-by-side factor matrices.

Prérequis

Suivez l’évolution dans un notebook

Vous pouvez choisir l’une de ces options pour suivre l’évolution de la situation dans un notebook :

  • Ouvrez et exécutez le notebook intégré dans l’expérience science des données Synapse
  • Chargez votre notebook à partir de GitHub vers l’expérience science des données Synapse

Ouvrir le notebook intégré

L’exemple de notebook sur la recommandation de livre accompagne ce tutoriel.

Pour ouvrir l’exemple de notebook intégré au tutoriel dans l’expérience science des données Synapse :

  1. Accédez à la page d’accueil science des données Synapse.

  2. Sélectionnez Utiliser un échantillon.

  3. Sélectionnez l’échantillon correspondant :

    • À partir de l’onglet par défaut Workflows de bout en bout (Python), si l’exemple concerne un tutoriel Python.
    • À partir de l’onglet Workflows de bout en bout (R), si l’exemple concerne un tutoriel R.
    • À partir de l’onglet Tutoriels rapides, si l’exemple concerne un tutoriel rapide.
  4. Attachez un lakehouse au notebook avant de commencer à exécuter le code.

Importer le notebook à partir de GitHub

Le notebook AIsample - Recommandation de livre.ipynb accompagne ce tutoriel.

Pour ouvrir le notebook d’accompagnement de ce tutoriel, suivez les instructions fournies dans Préparer votre système pour la science des données afin d’importer le notebook dans votre espace de travail.

Si vous préférez copier et coller le code de cette page, vous pouvez créer un nouveau notebook.

Assurez-vous d’attacher un lakehouse au notebook avant de commencer à exécuter du code.

Étape 1 : Chargement des données

Dans ce scénario, le jeu de données de recommandation de livres se compose de trois jeux de données distincts :

Définissez ces paramètres afin que vous puissiez utiliser ce notebook avec différents jeux de données :

IS_CUSTOM_DATA = False  # If True, the dataset has to be uploaded manually

USER_ID_COL = "User-ID"  # Must not be '_user_id' for this notebook to run successfully
ITEM_ID_COL = "ISBN"  # Must not be '_item_id' for this notebook to run successfully
ITEM_INFO_COL = (
    "Book-Title"  # Must not be '_item_info' for this notebook to run successfully
)
RATING_COL = (
    "Book-Rating"  # Must not be '_rating' for this notebook to run successfully
)
IS_SAMPLE = True  # If True, use only <SAMPLE_ROWS> rows of data for training; otherwise, use all data
SAMPLE_ROWS = 5000  # If IS_SAMPLE is True, use only this number of rows for training

DATA_FOLDER = "Files/book-recommendation/"  # Folder that contains the datasets
ITEMS_FILE = "Books.csv"  # File that contains the item information
USERS_FILE = "Users.csv"  # File that contains the user information
RATINGS_FILE = "Ratings.csv"  # File that contains the rating information

EXPERIMENT_NAME = "aisample-recommendation"  # MLflow experiment name

Télécharger et stocker les données dans un lakehouse

Ce code télécharge le jeu de données, puis les stocke dans le lakehouse.

Important

Assurez-vous d’ajouter un lakehouse au notebook avant de l’exécuter. Sinon, une erreur s’affiche.

if not IS_CUSTOM_DATA:
    # Download data files into a lakehouse if they don't exist
    import os, requests

    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/Book-Recommendation-Dataset"
    file_list = ["Books.csv", "Ratings.csv", "Users.csv"]
    download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError(
            "Default lakehouse not found, please add a lakehouse and restart the session."
        )
    os.makedirs(download_path, exist_ok=True)
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    print("Downloaded demo data files into lakehouse.")

Configurer le suivi des expériences MLflow

Utilisez ce code pour mettre en place le suivi de l’expérience MLflow. Cet exemple désactive l’autologging. Pour plus d’informations, consultez l’article Autologging dans Microsoft Fabric.

# Set up MLflow for experiment tracking
import mlflow

mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True)  # Disable MLflow autologging

Lire des données à partir du lakehouse

Une fois que les données correctes ont été placées dans la maison du lac, lisez les trois jeux de données dans des DataFrames Spark distinctes dans le notebook. Les chemins d’accès aux fichiers dans ce code utilisent les paramètres définis précédemment.

df_items = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv(f"{DATA_FOLDER}/raw/{ITEMS_FILE}")
    .cache()
)

df_ratings = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv(f"{DATA_FOLDER}/raw/{RATINGS_FILE}")
    .cache()
)

df_users = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv(f"{DATA_FOLDER}/raw/{USERS_FILE}")
    .cache()
)

Étape 2 : Effectuer une analyse exploratoire des données

Afficher les données brutes

Explorez les DataFrames à l’aide de la commande display. Cette commande vous permet d’afficher des statistiques de haut niveau sur les DataFrame et de comprendre comment les différentes colonnes du jeu de données sont liées les unes aux autres. Avant d’explorer les jeux de données, utilisez ce code pour importer les bibliothèques nécessaires :

import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer
import matplotlib.pyplot as plt
import seaborn as sns
color = sns.color_palette()  # Adjusting plotting style
import pandas as pd  # DataFrames

Utilisez ce code pour consulter le DataFrame qui contient les données du livre :

display(df_items, summary=True)

Ajoutez une colonne _item_id pour une utilisation ultérieure. La valeur _item_id doit être un entier pour les modèles de recommandation. Ce code utilise StringIndexer pour transformer ITEM_ID_COL en indices :

df_items = (
    StringIndexer(inputCol=ITEM_ID_COL, outputCol="_item_id")
    .setHandleInvalid("skip")
    .fit(df_items)
    .transform(df_items)
    .withColumn("_item_id", F.col("_item_id").cast("int"))
)

Affichez le DataFrame et vérifiez si la valeur _item_id augmente de façon monotone et successive, comme prévu :

display(df_items.sort(F.col("_item_id").desc()))

Utilisez ce code pour tracer les 10 premiers auteurs, par nombre de livres écrits, dans l’ordre décroissant. Agatha Christie est le premier auteur avec plus de 600 livres, suivi de William Shakespeare.

df_books = df_items.toPandas() # Create a pandas DataFrame from the Spark DataFrame for visualization
plt.figure(figsize=(8,5))
sns.countplot(y="Book-Author",palette = 'Paired', data=df_books,order=df_books['Book-Author'].value_counts().index[0:10])
plt.title("Top 10 authors with maximum number of books")

Screenshot showing a graph of the top 10 authors who wrote the highest number of books.

Ensuite, affichez le DataFrame qui contient les données utilisateur :

display(df_users, summary=True)

Si une ligne a une valeur User-ID manquante, supprimez cette ligne. Les valeurs manquantes dans un jeu de données personnalisé ne causent pas de problèmes.

df_users = df_users.dropna(subset=(USER_ID_COL))
display(df_users, summary=True)

Ajoutez une colonne _user_id pour une utilisation ultérieure. Pour les modèles de recommandation, la valeur _user_id doit être un entier. L’exemple de code suivant utilise StringIndexer pour transformer USER_ID_COL en index.

Le jeu de données de livre possède déjà une colonne User-ID entière. Toutefois, l’ajout d’une colonne _user_id pour la compatibilité avec différents jeux de données rend cet exemple plus robuste. Utilisez ce code pour ajouter la colonne _user_id :

df_users = (
    StringIndexer(inputCol=USER_ID_COL, outputCol="_user_id")
    .setHandleInvalid("skip")
    .fit(df_users)
    .transform(df_users)
    .withColumn("_user_id", F.col("_user_id").cast("int"))
)
display(df_users.sort(F.col("_user_id").desc()))

Utilisez ce code pour afficher les données d’évaluation :

display(df_ratings, summary=True)

Obtenez les notes distinctes et enregistrez-les pour une utilisation ultérieure dans une liste nommée ratings :

ratings = [i[0] for i in df_ratings.select(RATING_COL).distinct().collect()]
print(ratings)

Utilisez ce code pour afficher les 10 premiers livres avec les évaluations les plus élevées :

plt.figure(figsize=(8,5))
sns.countplot(y="Book-Title",palette = 'Paired',data= df_books, order=df_books['Book-Title'].value_counts().index[0:10])
plt.title("Top 10 books per number of ratings")

Selon les évaluations, Selected Poems est le livre le plus populaire. Les Aventures de Huckleberry Finn, Le Jardin secret et Dracula ont la même note.

Screenshot showing a graph of the top-rated books.

Fusionner les données

Fusionnez les trois DataFrames en un seul DataFrame pour une analyse plus complète :

df_all = df_ratings.join(df_users, USER_ID_COL, "inner").join(
    df_items, ITEM_ID_COL, "inner"
)
df_all_columns = [
    c for c in df_all.columns if c not in ["_user_id", "_item_id", RATING_COL]
]

# Reorder the columns to ensure that _user_id, _item_id, and Book-Rating are the first three columns
df_all = (
    df_all.select(["_user_id", "_item_id", RATING_COL] + df_all_columns)
    .withColumn("id", F.monotonically_increasing_id())
    .cache()
)

display(df_all)

Utilisez ce code pour afficher le nombre d’utilisateurs, de livres et d’interactions distincts :

print(f"Total Users: {df_users.select('_user_id').distinct().count()}")
print(f"Total Items: {df_items.select('_item_id').distinct().count()}")
print(f"Total User-Item Interactions: {df_all.count()}")

Utilisez ce code pour calculer et afficher les 10 livres les plus populaires :

# Compute top popular products
df_top_items = (
    df_all.groupby(["_item_id"])
    .count()
    .join(df_items, "_item_id", "inner")
    .sort(["count"], ascending=[0])
)

# Find top <topn> popular items
topn = 10
pd_top_items = df_top_items.limit(topn).toPandas()
pd_top_items.head(10)

Conseil

Utilisez la valeur <topn> pour les sections Recommandations Populaires ou recommandations Les plus achetées.

# Plot top <topn> items
f, ax = plt.subplots(figsize=(10, 5))
plt.xticks(rotation="vertical")
sns.barplot(y=ITEM_INFO_COL, x="count", data=pd_top_items)
ax.tick_params(axis='x', rotation=45)
plt.xlabel("Number of Ratings for the Item")
plt.show()

Screenshot of a graph of the most popular books.

Préparation de jeux de données d’apprentissage et de test

La matrice ALS nécessite une préparation des données avant la formation. Utilisez cet exemple de code pour préparer les données. Le code effectue les actions suivantes :

  • Attribuez le type correct à la colonne d’évaluation
  • Échantillonnez les données d’apprentissage avec des évaluations utilisateur
  • Fractionner les données en jeux de données d’entraînement et de test.
if IS_SAMPLE:
    # Must sort by '_user_id' before performing limit to ensure that ALS works normally
    # If training and test datasets have no common _user_id, ALS will fail
    df_all = df_all.sort("_user_id").limit(SAMPLE_ROWS)

# Cast the column into the correct type
df_all = df_all.withColumn(RATING_COL, F.col(RATING_COL).cast("float"))

# Using a fraction between 0 and 1 returns the approximate size of the dataset; for example, 0.8 means 80% of the dataset
# Rating = 0 means the user didn't rate the item, so it can't be used for training
# We use the 80% of the dataset with rating > 0 as the training dataset
fractions_train = {0: 0}
fractions_test = {0: 0}
for i in ratings:
    if i == 0:
        continue
    fractions_train[i] = 0.8
    fractions_test[i] = 1
# Training dataset
train = df_all.sampleBy(RATING_COL, fractions=fractions_train)

# Join with leftanti will select all rows from df_all with rating > 0 and not in the training dataset; for example, the remaining 20% of the dataset
# test dataset
test = df_all.join(train, on="id", how="leftanti").sampleBy(
    RATING_COL, fractions=fractions_test
)

La rareté fait référence à la densité des données de retour d’expérience, qui ne permet pas d’identifier les similitudes entre les intérêts des utilisateurs. Pour une meilleure compréhension des données et du problème actuel, utilisez ce code pour calculer la densité de jeu de données :

# Compute the sparsity of the dataset
def get_mat_sparsity(ratings):
    # Count the total number of ratings in the dataset - used as numerator
    count_nonzero = ratings.select(RATING_COL).count()
    print(f"Number of rows: {count_nonzero}")

    # Count the total number of distinct user_id and distinct product_id - used as denominator
    total_elements = (
        ratings.select("_user_id").distinct().count()
        * ratings.select("_item_id").distinct().count()
    )

    # Calculate the sparsity by dividing the numerator by the denominator
    sparsity = (1.0 - (count_nonzero * 1.0) / total_elements) * 100
    print("The ratings DataFrame is ", "%.4f" % sparsity + "% sparse.")

get_mat_sparsity(df_all)
# Check the ID range
# ALS supports only values in the integer range
print(f"max user_id: {df_all.agg({'_user_id': 'max'}).collect()[0][0]}")
print(f"max user_id: {df_all.agg({'_item_id': 'max'}).collect()[0][0]}")

Étape 3 : développer et former le modèle

Former un modèle ALS pour donner aux utilisateurs des recommandations personnalisées.

Définir le modèle

Spark ML fournit une API pratique pour générer le modèle ALS. Toutefois, le modèle ne permet pas de traiter de manière fiable des problèmes tels que la densité des données et le démarrage à froid (faire des recommandations lorsque les utilisateurs ou les éléments sont nouveaux). Pour améliorer les performances du modèle, combinez la validation croisée et l’ajustement automatique des hyperparamètres.

Utilisez ce code pour importer les bibliothèques nécessaires à la formation et à l’évaluation des modèles :

# Import Spark required libraries
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit

# Specify the training parameters
num_epochs = 1  # Number of epochs; here we use 1 to reduce the training time
rank_size_list = [64]  # The values of rank in ALS for tuning
reg_param_list = [0.01, 0.1]  # The values of regParam in ALS for tuning
model_tuning_method = "TrainValidationSplit"  # TrainValidationSplit or CrossValidator
# Build the recommendation model by using ALS on the training data
# We set the cold start strategy to 'drop' to ensure that we don't get NaN evaluation metrics
als = ALS(
    maxIter=num_epochs,
    userCol="_user_id",
    itemCol="_item_id",
    ratingCol=RATING_COL,
    coldStartStrategy="drop",
    implicitPrefs=False,
    nonnegative=True,
)

Régler les hyperparamètres du modèle

L’exemple de code suivant construit une grille de paramètres pour faciliter la recherche sur les hyperparamètres. Le code crée également un évaluateur de régression qui utilise l’erreur quadratique moyenne (RMSE) comme métrique d’évaluation :

#  Construct a grid search to select the best values for the training parameters
param_grid = (
    ParamGridBuilder()
    .addGrid(als.rank, rank_size_list)
    .addGrid(als.regParam, reg_param_list)
    .build()
)

print("Number of models to be tested: ", len(param_grid))

# Define the evaluator and set the loss function to the RMSE 
evaluator = RegressionEvaluator(
    metricName="rmse", labelCol=RATING_COL, predictionCol="prediction"
)

L’exemple de code suivant lance différentes méthodes de réglage du modèle en fonction des paramètres préconfigurés. Pour plus d’informations sur le réglage des modèles, consultez Réglage ML : sélection de modèle et réglage des hyperparamètres sur le site web Apache Spark.

# Build cross-validation by using CrossValidator and TrainValidationSplit
if model_tuning_method == "CrossValidator":
    tuner = CrossValidator(
        estimator=als,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        numFolds=5,
        collectSubModels=True,
    )
elif model_tuning_method == "TrainValidationSplit":
    tuner = TrainValidationSplit(
        estimator=als,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        # 80% of the training data will be used for training; 20% for validation
        trainRatio=0.8,
        collectSubModels=True,
    )
else:
    raise ValueError(f"Unknown model_tuning_method: {model_tuning_method}")

Évaluer le modèle

Vous devez évaluer les modules par rapport aux données de test. Un modèle bien entraîné doit avoir des métriques élevées sur le jeu de données.

Un modèle surajusté peut nécessiter une augmentation de la taille des données d’apprentissage ou une réduction de certaines caractéristiques redondantes. Il se peut que l’architecture du modèle doive être modifiée ou que ses paramètres doivent être ajustés.

Remarque

Une valeur négative de la métrique R-carré indique que le modèle formé est moins performant qu’une ligne droite horizontale. Cette recherche suggère que le modèle entraîné n’explique pas les données.

Pour définir une fonction d’évaluation, utilisez ce code :

def evaluate(model, data, verbose=0):
    """
    Evaluate the model by computing rmse, mae, r2, and variance over the data.
    """

    predictions = model.transform(data).withColumn(
        "prediction", F.col("prediction").cast("double")
    )

    if verbose > 1:
        # Show 10 predictions
        predictions.select("_user_id", "_item_id", RATING_COL, "prediction").limit(
            10
        ).show()

    # Initialize the regression evaluator
    evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=RATING_COL)

    _evaluator = lambda metric: evaluator.setMetricName(metric).evaluate(predictions)
    rmse = _evaluator("rmse")
    mae = _evaluator("mae")
    r2 = _evaluator("r2")
    var = _evaluator("var")

    if verbose > 0:
        print(f"RMSE score = {rmse}")
        print(f"MAE score = {mae}")
        print(f"R2 score = {r2}")
        print(f"Explained variance = {var}")

    return predictions, (rmse, mae, r2, var)

Suivre l’expérience à l’aide de MLflow

Utilisez MLflow pour suivre toutes les expériences et pour enregistrer les paramètres, les métriques et les modèles. Pour démarrer la formation et l’évaluation du modèle, utilisez ce code :

from mlflow.models.signature import infer_signature

with mlflow.start_run(run_name="als"):
    # Train models
    models = tuner.fit(train)
    best_metrics = {"RMSE": 10e6, "MAE": 10e6, "R2": 0, "Explained variance": 0}
    best_index = 0
    # Evaluate models
    # Log models, metrics, and parameters
    for idx, model in enumerate(models.subModels):
        with mlflow.start_run(nested=True, run_name=f"als_{idx}") as run:
            print("\nEvaluating on test data:")
            print(f"subModel No. {idx + 1}")
            predictions, (rmse, mae, r2, var) = evaluate(model, test, verbose=1)

            signature = infer_signature(
                train.select(["_user_id", "_item_id"]),
                predictions.select(["_user_id", "_item_id", "prediction"]),
            )
            print("log model:")
            mlflow.spark.log_model(
                model,
                f"{EXPERIMENT_NAME}-alsmodel",
                signature=signature,
                registered_model_name=f"{EXPERIMENT_NAME}-alsmodel",
                dfs_tmpdir="Files/spark",
            )
            print("log metrics:")
            current_metric = {
                "RMSE": rmse,
                "MAE": mae,
                "R2": r2,
                "Explained variance": var,
            }
            mlflow.log_metrics(current_metric)
            if rmse < best_metrics["RMSE"]:
                best_metrics = current_metric
                best_index = idx

            print("log parameters:")
            mlflow.log_params(
                {
                    "subModel_idx": idx,
                    "num_epochs": num_epochs,
                    "rank_size_list": rank_size_list,
                    "reg_param_list": reg_param_list,
                    "model_tuning_method": model_tuning_method,
                    "DATA_FOLDER": DATA_FOLDER,
                }
            )
    # Log the best model and related metrics and parameters to the parent run
    mlflow.spark.log_model(
        models.subModels[best_index],
        f"{EXPERIMENT_NAME}-alsmodel",
        signature=signature,
        registered_model_name=f"{EXPERIMENT_NAME}-alsmodel",
        dfs_tmpdir="Files/spark",
    )
    mlflow.log_metrics(best_metrics)
    mlflow.log_params(
        {
            "subModel_idx": idx,
            "num_epochs": num_epochs,
            "rank_size_list": rank_size_list,
            "reg_param_list": reg_param_list,
            "model_tuning_method": model_tuning_method,
            "DATA_FOLDER": DATA_FOLDER,
        }
    )

Sélectionnez l’expérience nommée aisample-recommendation dans votre espace de travail pour afficher les informations enregistrées pour la formation. Si vous avez modifié le nom de l’expérience, sélectionnez l’expérience qui porte le nouveau nom. Les informations journalisées ressemblent à cette image :

Screenshot of the experiment logs.

Étape 4 : charger le modèle final pour la notation et effectuer des prédictions

Une fois que vous avez terminé l’apprentissage du modèle et que vous avez sélectionné le meilleur modèle, chargez le modèle pour l’évaluation (parfois appelée inférence). Ce code charge le modèle et utilise les prédictions pour recommander les 10 meilleurs livres pour chaque utilisateur :

# Load the best model
# MLflow uses PipelineModel to wrap the original model, so we extract the original ALSModel from the stages
model_uri = f"models:/{EXPERIMENT_NAME}-alsmodel/1"
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir="Files/spark").stages[-1]

# Generate top 10 book recommendations for each user
userRecs = loaded_model.recommendForAllUsers(10)

# Represent the recommendations in an interpretable format
userRecs = (
    userRecs.withColumn("rec_exp", F.explode("recommendations"))
    .select("_user_id", F.col("rec_exp._item_id"), F.col("rec_exp.rating"))
    .join(df_items.select(["_item_id", "Book-Title"]), on="_item_id")
)
userRecs.limit(10).show()

La sortie ressemble à ce tableau :

_item_id _user_id rating Titre-livre
44865 7 7.9996786 Lasher: Lives of ...
786 7 6.2255826 The Piano Man’s D...
45330 7 4.980466 State of Mind
38960 7 4.980466 All He Ever Wanted
125415 7 4.505084 Harry Potter and ...
44939 7 4.3579073 Taltos: Lives of ...
175247 7 4.3579073 The Bonesetter’s ...
170183 7 4.228735 Living the Simple...
88503 7 4.221206 Island of the Blu...
32894 7 3.9031885 Winter Solstice

Enregistrer les prédictions dans le Lakehouse

Utilisez ce code pour écrire les recommandations de retour au lakehouse :

# Code to save userRecs into the lakehouse
userRecs.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/userRecs"
)