Tutorial: creación, evaluación e implementación de un sistema de recomendaciones

En este tutorial se muestra un ejemplo completo de un flujo de trabajo de ciencia de datos de Synapse en Microsoft Fabric. En el escenario se crea un modelo para recomendaciones de libros en línea.

En este tutorial se describen estos pasos:

  • Carga de los datos en un almacén de lago de datos
  • Realización de análisis exploratorios en los datos
  • Entrenamiento y registro de un modelo con MLflow
  • Carga del modelo y realización de predicciones

Tenemos muchos tipos de algoritmos de recomendación disponibles. En este tutorial se usa el algoritmo de factorización de matriz de alternación de mínimos cuadrados (ALS). ALS es un algoritmo de filtrado colaborativo basado en modelos.

Screenshot showing a chart of recommendation algorithms types.

ALS intenta calcular la matriz de clasificaciones R como producto de dos matrices de clasificación inferior, U y V. Aquí, R = U * Vt. Normalmente, estas aproximaciones se denominan matrices de factor.

El algoritmo ALS es iterativo. Cada iteración mantiene constante una de las matrices de factor, mientras que la otra se resuelve para usar el méotodo de mínimos cuadrados. A continuación, mantiene constante la matriz de factor recién resuelta mientras resuelve la otra matriz de factor.

Screenshot of two side-by-side factor matrices.

Requisitos previos

Seguir en un cuaderno

Puede elegir una de estas opciones para seguir en un cuaderno:

  • Abra y ejecute el cuaderno integrado en la experiencia de ciencia de datos de Synapse.
  • Cargue su cuaderno desde GitHub a la experiencia de ciencia de datos de Synapse.

Abra el cuaderno integrado

El cuaderno de muestra Book recommendation acompaña a este tutorial.

Para abrir el cuaderno de muestra integrado en el tutorial en la experiencia de ciencia de datos de Synapse:

  1. Vaya a la página principal de ciencia de datos de Synapse.

  2. Seleccione Utilizar una muestra.

  3. Seleccione la muestra correspondiente:

    • Desde la pestaña predeterminada Flujos de trabajo de un extremo a otro (Python), si la muestra es para un tutorial de Python.
    • Desde la pestaña Flujos de trabajo de un extremo a otro (R), si la muestra es para un tutorial de R.
    • En la pestaña Tutoriales rápidos, si la muestra es para un tutorial rápido.
  4. Adjunte una instancia de LakeHouse al cuaderno antes de empezar a ejecutar código.

Importación del cuaderno desde GitHub

El cuaderno AIsample - Book Recommendation.ipynb acompaña a este tutorial.

Para abrir el cuaderno complementario para este tutorial, siga las instrucciones en Preparación del sistema para los tutoriales de ciencia de datos para importar el cuaderno en el área de trabajo.

Si prefiere copiar y pegar el código de esta página, puede crear un cuaderno nuevo.

Asegúrese de adjuntar una instancia de LakeHouse al cuaderno antes de empezar a ejecutar código.

Paso 1: Carga de los datos

El conjunto de datos de recomendaciones de libro de este escenario consta de tres conjuntos de datos independientes:

Defina estos parámetros para poder utilizar este cuaderno con diferentes conjuntos de datos:

IS_CUSTOM_DATA = False  # If True, the dataset has to be uploaded manually

USER_ID_COL = "User-ID"  # Must not be '_user_id' for this notebook to run successfully
ITEM_ID_COL = "ISBN"  # Must not be '_item_id' for this notebook to run successfully
ITEM_INFO_COL = (
    "Book-Title"  # Must not be '_item_info' for this notebook to run successfully
)
RATING_COL = (
    "Book-Rating"  # Must not be '_rating' for this notebook to run successfully
)
IS_SAMPLE = True  # If True, use only <SAMPLE_ROWS> rows of data for training; otherwise, use all data
SAMPLE_ROWS = 5000  # If IS_SAMPLE is True, use only this number of rows for training

DATA_FOLDER = "Files/book-recommendation/"  # Folder that contains the datasets
ITEMS_FILE = "Books.csv"  # File that contains the item information
USERS_FILE = "Users.csv"  # File that contains the user information
RATINGS_FILE = "Ratings.csv"  # File that contains the rating information

EXPERIMENT_NAME = "aisample-recommendation"  # MLflow experiment name

Descarga y almacenamiento de los datos en un almacén de lago

Este código descarga el conjunto de datos y, a continuación, lo almacena en el almacén de lago.

Importante

Asegúrese de agregar un almacén de lago al cuaderno antes de ejecutarlo. De lo contrario, recibirá un error.

if not IS_CUSTOM_DATA:
    # Download data files into a lakehouse if they don't exist
    import os, requests

    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/Book-Recommendation-Dataset"
    file_list = ["Books.csv", "Ratings.csv", "Users.csv"]
    download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError(
            "Default lakehouse not found, please add a lakehouse and restart the session."
        )
    os.makedirs(download_path, exist_ok=True)
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    print("Downloaded demo data files into lakehouse.")

Configuración del seguimiento del experimento de MLflow

Use este código para configurar el seguimiento del experimento de MLflow. En este ejemplo se deshabilita el registro automático. Para obtener más información, consulte el artículo Registro automático en Microsoft Fabric.

# Set up MLflow for experiment tracking
import mlflow

mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True)  # Disable MLflow autologging

Lectura de datos del almacén de lago

Una vez que los datos correctos estén en el almacén de lago, lea los tres conjuntos de datos en DataFrames de Spark independientes del cuaderno. Las rutas de acceso de este código usan los parámetros definidos anteriormente.

df_items = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv(f"{DATA_FOLDER}/raw/{ITEMS_FILE}")
    .cache()
)

df_ratings = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv(f"{DATA_FOLDER}/raw/{RATINGS_FILE}")
    .cache()
)

df_users = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv(f"{DATA_FOLDER}/raw/{USERS_FILE}")
    .cache()
)

Paso 2: Realización de un análisis exploratorio de los datos

Mostrar datos sin procesar

Explore los DataFrames con el comando display. Con este comando, puede ver estadísticas de alto nivel de los DataFrames y comprender cómo se relacionan las distintas columnas de los conjuntos de datos entre sí. Antes de explorar los conjuntos de datos, use este código para importar las bibliotecas necesarias:

import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer
import matplotlib.pyplot as plt
import seaborn as sns
color = sns.color_palette()  # Adjusting plotting style
import pandas as pd  # DataFrames

Use este código para ver el DataFrame que contiene los datos de los libros:

display(df_items, summary=True)

Agregue la columna _item_id para su uso posterior. El valor de _item_id debe ser un entero para los modelos de recomendación. Este código usa StringIndexer para transformar ITEM_ID_COL en índices:

df_items = (
    StringIndexer(inputCol=ITEM_ID_COL, outputCol="_item_id")
    .setHandleInvalid("skip")
    .fit(df_items)
    .transform(df_items)
    .withColumn("_item_id", F.col("_item_id").cast("int"))
)

Mostrar el DataFrame y comprobar si el valor de _item_id aumenta monotónicamente y sucesivamente como se esperaba:

display(df_items.sort(F.col("_item_id").desc()))

Use este código para trazar los 10 autores principales por número de libros escritos, en orden descendente. Agatha Christie es la principal autora con más de 600 libros, seguida de William Shakespeare.

df_books = df_items.toPandas() # Create a pandas DataFrame from the Spark DataFrame for visualization
plt.figure(figsize=(8,5))
sns.countplot(y="Book-Author",palette = 'Paired', data=df_books,order=df_books['Book-Author'].value_counts().index[0:10])
plt.title("Top 10 authors with maximum number of books")

Screenshot showing a graph of the top 10 authors who wrote the highest number of books.

A continuación, muestre el DataFrame que contiene los datos de usuario:

display(df_users, summary=True)

Si una fila tiene un valor de User-ID que falta, se quita esa fila. Los valores que faltan en un conjunto de datos personalizado no provocan problemas.

df_users = df_users.dropna(subset=(USER_ID_COL))
display(df_users, summary=True)

Agregue la columna _user_id para su uso posterior. Para los modelos de recomendación, el valor de _user_id debe ser un entero. El siguiente ejemplo de código usa StringIndexer para transformar USER_ID_COL en índices.

El conjunto de datos de libro ya tiene una columna User-ID de enteros. Sin embargo, si se agregase la columna _user_id para obtener compatibilidad con diferentes conjuntos de datos, este ejemplo sería más sólido. Use este código para agregar la columna _user_id:

df_users = (
    StringIndexer(inputCol=USER_ID_COL, outputCol="_user_id")
    .setHandleInvalid("skip")
    .fit(df_users)
    .transform(df_users)
    .withColumn("_user_id", F.col("_user_id").cast("int"))
)
display(df_users.sort(F.col("_user_id").desc()))

Use este código para ver los datos de clasificación:

display(df_ratings, summary=True)

Obtenga las distintas clasificaciones y guárdelas en una lista llamada ratings para su uso posterior:

ratings = [i[0] for i in df_ratings.select(RATING_COL).distinct().collect()]
print(ratings)

Use este código para mostrar los 10 libros principales con las clasificaciones más altas:

plt.figure(figsize=(8,5))
sns.countplot(y="Book-Title",palette = 'Paired',data= df_books, order=df_books['Book-Title'].value_counts().index[0:10])
plt.title("Top 10 books per number of ratings")

Según las clasificaciones, Selected Poems es el libro más popular. Aventuras de Huckleberry Finn, El jardín secreto y Drácula tienen la misma calificación.

Screenshot showing a graph of the top-rated books.

Combinar datos

Combine tres DataFrames en uno para obtener un análisis más completo:

df_all = df_ratings.join(df_users, USER_ID_COL, "inner").join(
    df_items, ITEM_ID_COL, "inner"
)
df_all_columns = [
    c for c in df_all.columns if c not in ["_user_id", "_item_id", RATING_COL]
]

# Reorder the columns to ensure that _user_id, _item_id, and Book-Rating are the first three columns
df_all = (
    df_all.select(["_user_id", "_item_id", RATING_COL] + df_all_columns)
    .withColumn("id", F.monotonically_increasing_id())
    .cache()
)

display(df_all)

Use este código para mostrar un recuento de los usuarios, libros e interacciones distintos:

print(f"Total Users: {df_users.select('_user_id').distinct().count()}")
print(f"Total Items: {df_items.select('_item_id').distinct().count()}")
print(f"Total User-Item Interactions: {df_all.count()}")

Use este código para calcular y mostrar los 10 libros más populares:

# Compute top popular products
df_top_items = (
    df_all.groupby(["_item_id"])
    .count()
    .join(df_items, "_item_id", "inner")
    .sort(["count"], ascending=[0])
)

# Find top <topn> popular items
topn = 10
pd_top_items = df_top_items.limit(topn).toPandas()
pd_top_items.head(10)

Sugerencia

Use el valor <topn> para las secciones de recomendación Popular o Productos más comprados.

# Plot top <topn> items
f, ax = plt.subplots(figsize=(10, 5))
plt.xticks(rotation="vertical")
sns.barplot(y=ITEM_INFO_COL, x="count", data=pd_top_items)
ax.tick_params(axis='x', rotation=45)
plt.xlabel("Number of Ratings for the Item")
plt.show()

Screenshot of a graph of the most popular books.

Preparación de conjuntos de datos de entrenamiento y prueba

La matriz ALS requiere cierta preparación de datos antes del entrenamiento. Use este ejemplo de código para preparar los datos. El código realiza estas acciones:

  • Convertir la columna de clasificación en el tipo correcto
  • Mostrar los datos de entrenamiento con clasificaciones de usuario
  • Dividir los datos en conjuntos de entrenamiento y de prueba
if IS_SAMPLE:
    # Must sort by '_user_id' before performing limit to ensure that ALS works normally
    # If training and test datasets have no common _user_id, ALS will fail
    df_all = df_all.sort("_user_id").limit(SAMPLE_ROWS)

# Cast the column into the correct type
df_all = df_all.withColumn(RATING_COL, F.col(RATING_COL).cast("float"))

# Using a fraction between 0 and 1 returns the approximate size of the dataset; for example, 0.8 means 80% of the dataset
# Rating = 0 means the user didn't rate the item, so it can't be used for training
# We use the 80% of the dataset with rating > 0 as the training dataset
fractions_train = {0: 0}
fractions_test = {0: 0}
for i in ratings:
    if i == 0:
        continue
    fractions_train[i] = 0.8
    fractions_test[i] = 1
# Training dataset
train = df_all.sampleBy(RATING_COL, fractions=fractions_train)

# Join with leftanti will select all rows from df_all with rating > 0 and not in the training dataset; for example, the remaining 20% of the dataset
# test dataset
test = df_all.join(train, on="id", how="leftanti").sampleBy(
    RATING_COL, fractions=fractions_test
)

La dispersión hace referencia a los datos de comentarios dispersos que no permiten identificar similitudes en los intereses de los usuarios. Para comprender mejor los datos y el problema actual, use este código para calcular la dispersión del conjunto de datos:

# Compute the sparsity of the dataset
def get_mat_sparsity(ratings):
    # Count the total number of ratings in the dataset - used as numerator
    count_nonzero = ratings.select(RATING_COL).count()
    print(f"Number of rows: {count_nonzero}")

    # Count the total number of distinct user_id and distinct product_id - used as denominator
    total_elements = (
        ratings.select("_user_id").distinct().count()
        * ratings.select("_item_id").distinct().count()
    )

    # Calculate the sparsity by dividing the numerator by the denominator
    sparsity = (1.0 - (count_nonzero * 1.0) / total_elements) * 100
    print("The ratings DataFrame is ", "%.4f" % sparsity + "% sparse.")

get_mat_sparsity(df_all)
# Check the ID range
# ALS supports only values in the integer range
print(f"max user_id: {df_all.agg({'_user_id': 'max'}).collect()[0][0]}")
print(f"max user_id: {df_all.agg({'_item_id': 'max'}).collect()[0][0]}")

Paso 3: Desarrollo y entrenamiento del modelo

Entrene un modelo de ALS para proporcionar recomendaciones personalizadas a los usuarios.

Definición del modelo

Spark ML proporciona una API cómoda para compilar el modelo ALS. Sin embargo, el modelo no gestiona de forma fiable problemas como la dispersión de datos y el inicio en frío (haciendo recomendaciones cuando los usuarios o elementos son nuevos). Para mejorar el rendimiento del modelo, combinaremos la validación cruzada y el ajuste automático de hiperparámetros.

Use este código para importar las bibliotecas necesarias para entrenar y evaluar el modelo:

# Import Spark required libraries
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit

# Specify the training parameters
num_epochs = 1  # Number of epochs; here we use 1 to reduce the training time
rank_size_list = [64]  # The values of rank in ALS for tuning
reg_param_list = [0.01, 0.1]  # The values of regParam in ALS for tuning
model_tuning_method = "TrainValidationSplit"  # TrainValidationSplit or CrossValidator
# Build the recommendation model by using ALS on the training data
# We set the cold start strategy to 'drop' to ensure that we don't get NaN evaluation metrics
als = ALS(
    maxIter=num_epochs,
    userCol="_user_id",
    itemCol="_item_id",
    ratingCol=RATING_COL,
    coldStartStrategy="drop",
    implicitPrefs=False,
    nonnegative=True,
)

Ajuste de hiperparámetros del modelo

El siguiente ejemplo de código crea una cuadrícula de parámetros para ayudar a buscar en los hiperparámetros. El código también crea un evaluador de la regresión que usa la raíz del error cuadrático medio (RMSE) como métrica de evaluación:

#  Construct a grid search to select the best values for the training parameters
param_grid = (
    ParamGridBuilder()
    .addGrid(als.rank, rank_size_list)
    .addGrid(als.regParam, reg_param_list)
    .build()
)

print("Number of models to be tested: ", len(param_grid))

# Define the evaluator and set the loss function to the RMSE 
evaluator = RegressionEvaluator(
    metricName="rmse", labelCol=RATING_COL, predictionCol="prediction"
)

El siguiente ejemplo de código inicia diferentes métodos de ajuste de modelos en función de los parámetros preconfigurados. Para obtener más información sobre el ajuste de modelos, consulte Ajuste de ML: selección de modelos y ajuste de hiperparámetros en el sitio web de Apache Spark.

# Build cross-validation by using CrossValidator and TrainValidationSplit
if model_tuning_method == "CrossValidator":
    tuner = CrossValidator(
        estimator=als,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        numFolds=5,
        collectSubModels=True,
    )
elif model_tuning_method == "TrainValidationSplit":
    tuner = TrainValidationSplit(
        estimator=als,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        # 80% of the training data will be used for training; 20% for validation
        trainRatio=0.8,
        collectSubModels=True,
    )
else:
    raise ValueError(f"Unknown model_tuning_method: {model_tuning_method}")

Evaluación del modelo

Debe evaluar los modelos con los datos de prueba. Un modelo bien entrenado debe tener métricas elevadas en el conjunto de datos.

Un modelo sobreajustado podría necesitar un aumento en el tamaño de los datos de entrenamiento o una reducción de algunas de las características redundantes. Es posible que la arquitectura del modelo tenga que cambiar o que sus parámetros necesiten un ajuste.

Nota:

Si el valor de la métrica R cuadrado es negativo, indica que el modelo entrenado funciona peor que una línea recta horizontal. Este resultado sugiere que el modelo entrenado no explica los datos.

Para definir una función de evaluación, use este código:

def evaluate(model, data, verbose=0):
    """
    Evaluate the model by computing rmse, mae, r2, and variance over the data.
    """

    predictions = model.transform(data).withColumn(
        "prediction", F.col("prediction").cast("double")
    )

    if verbose > 1:
        # Show 10 predictions
        predictions.select("_user_id", "_item_id", RATING_COL, "prediction").limit(
            10
        ).show()

    # Initialize the regression evaluator
    evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=RATING_COL)

    _evaluator = lambda metric: evaluator.setMetricName(metric).evaluate(predictions)
    rmse = _evaluator("rmse")
    mae = _evaluator("mae")
    r2 = _evaluator("r2")
    var = _evaluator("var")

    if verbose > 0:
        print(f"RMSE score = {rmse}")
        print(f"MAE score = {mae}")
        print(f"R2 score = {r2}")
        print(f"Explained variance = {var}")

    return predictions, (rmse, mae, r2, var)

Realizar un seguimiento del experimento mediante MLflow

Use MLflow para supervisar todos los experimentos y registrar parámetros, métricas y modelos. Para iniciar el entrenamiento y la evaluación del modelo, use este código:

from mlflow.models.signature import infer_signature

with mlflow.start_run(run_name="als"):
    # Train models
    models = tuner.fit(train)
    best_metrics = {"RMSE": 10e6, "MAE": 10e6, "R2": 0, "Explained variance": 0}
    best_index = 0
    # Evaluate models
    # Log models, metrics, and parameters
    for idx, model in enumerate(models.subModels):
        with mlflow.start_run(nested=True, run_name=f"als_{idx}") as run:
            print("\nEvaluating on test data:")
            print(f"subModel No. {idx + 1}")
            predictions, (rmse, mae, r2, var) = evaluate(model, test, verbose=1)

            signature = infer_signature(
                train.select(["_user_id", "_item_id"]),
                predictions.select(["_user_id", "_item_id", "prediction"]),
            )
            print("log model:")
            mlflow.spark.log_model(
                model,
                f"{EXPERIMENT_NAME}-alsmodel",
                signature=signature,
                registered_model_name=f"{EXPERIMENT_NAME}-alsmodel",
                dfs_tmpdir="Files/spark",
            )
            print("log metrics:")
            current_metric = {
                "RMSE": rmse,
                "MAE": mae,
                "R2": r2,
                "Explained variance": var,
            }
            mlflow.log_metrics(current_metric)
            if rmse < best_metrics["RMSE"]:
                best_metrics = current_metric
                best_index = idx

            print("log parameters:")
            mlflow.log_params(
                {
                    "subModel_idx": idx,
                    "num_epochs": num_epochs,
                    "rank_size_list": rank_size_list,
                    "reg_param_list": reg_param_list,
                    "model_tuning_method": model_tuning_method,
                    "DATA_FOLDER": DATA_FOLDER,
                }
            )
    # Log the best model and related metrics and parameters to the parent run
    mlflow.spark.log_model(
        models.subModels[best_index],
        f"{EXPERIMENT_NAME}-alsmodel",
        signature=signature,
        registered_model_name=f"{EXPERIMENT_NAME}-alsmodel",
        dfs_tmpdir="Files/spark",
    )
    mlflow.log_metrics(best_metrics)
    mlflow.log_params(
        {
            "subModel_idx": idx,
            "num_epochs": num_epochs,
            "rank_size_list": rank_size_list,
            "reg_param_list": reg_param_list,
            "model_tuning_method": model_tuning_method,
            "DATA_FOLDER": DATA_FOLDER,
        }
    )

Seleccione el experimento denominado aisample-recommendation en el área de trabajo para ver la información registrada de la ejecución de entrenamiento. Si cambió el nombre del experimento, seleccione el experimento con que tenga el nuevo nombre. La información registrada es similar a esta imagen:

Screenshot of the experiment logs.

Paso 4: Cargar el modelo final para puntuar y realizar predicciones

Después de completar el entrenamiento del modelo y, a continuación, seleccionar el mejor modelo, cargue el modelo para puntuarlo (a veces denominado inferencia). Este código carga el modelo y usa predicciones para recomendar los principales 10 libros de cada usuario:

# Load the best model
# MLflow uses PipelineModel to wrap the original model, so we extract the original ALSModel from the stages
model_uri = f"models:/{EXPERIMENT_NAME}-alsmodel/1"
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir="Files/spark").stages[-1]

# Generate top 10 book recommendations for each user
userRecs = loaded_model.recommendForAllUsers(10)

# Represent the recommendations in an interpretable format
userRecs = (
    userRecs.withColumn("rec_exp", F.explode("recommendations"))
    .select("_user_id", F.col("rec_exp._item_id"), F.col("rec_exp.rating"))
    .join(df_items.select(["_item_id", "Book-Title"]), on="_item_id")
)
userRecs.limit(10).show()

La salida será similar a esta tabla:

_item_id _user_id rating Título del libro
44865 7 7.9996786 Lasher: Lives of ...
786 7 6.2255826 The Piano Man's D...
45330 7 4.980466 State of Mind
38960 7 4.980466 All He Ever Wanted
125415 7 4.505084 Harry Potter y ...
44939 7 4.3579073 Taltos: Lives of ...
175247 7 4.3579073 The Bonesetter's ...
170183 7 4.228735 Living the Simple...
88503 7 4.221206 Island of the Blu...
32894 7 3.9031885 Solsticio de invierno

Guardar las predicciones en el almacén de lago

Use este código para escribir las recomendaciones en el almacén de lago:

# Code to save userRecs into the lakehouse
userRecs.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/userRecs"
)