教學課程:建立、評估和評分建議系統
本教學課程提供 Microsoft Fabric 中 Synapse 資料科學 工作流程的端對端範例。 此案例會建置在線書籍建議的模型。
本教學課程涵蓋下列步驟:
- 將數據上傳至 Lakehouse
- 對數據執行探勘分析
- 定型模型,並使用 MLflow 加以記錄
- 載入模型並進行預測
我們有許多類型的建議演算法可供使用。 本教學課程使用交替最小平方 (ALS) 矩陣分解演算法。 ALS 是以模型為基礎的共同作業篩選演算法。
ALS 會嘗試將評等矩陣 R 估計為兩個低階矩陣的乘積,即您和 V。在這裡,R = U * Vt。 通常,這些近似值稱為 因數 矩陣。
ALS 演算法是反覆的。 每個反覆運算都會保存其中一個因數矩陣常數,而它會使用最小平方的 方法來解決另一個。 然後,它會保留新解析的因數矩陣常數,同時解決其他因數矩陣。
必要條件
取得 Microsoft Fabric 訂用 帳戶。 或者,註冊免費的 Microsoft Fabric 試用版。
登入 Microsoft Fabric。
使用首頁左側的體驗切換器,切換至 Synapse 資料科學 體驗。
- 如有必要,請建立 Microsoft Fabric lakehouse,如在 Microsoft Fabric 中建立 lakehouse 中所述。
在筆記本中跟著
您可以選擇下列其中一個選項,以遵循筆記本:
- 在 Synapse 資料科學 體驗中開啟並執行內建筆記本
- 將筆記本從 GitHub 上傳至 Synapse 資料科學 體驗
開啟內建筆記本
本教學課程隨附範例 Book 建議 筆記本。
若要在 Synapse 資料科學 體驗中開啟教學課程的內建範例筆記本:
移至 Synapse 資料科學 首頁。
選取 [ 使用範例]。
選取對應的範例:
- 如果範例適用於 Python 教學課程,請從預設 的端對端工作流程 (Python) 索引標籤。
- 如果範例適用於 R 教學課程,請從 [端對端工作流程] 索引標籤。
- 如果範例適用於快速教學課程,請從 [ 快速教學 課程] 索引卷標。
從 GitHub 匯入筆記本
AIsample - Book Recommendation.ipynb Notebook 隨附本教學課程。
若要開啟本教學課程隨附的筆記本,請遵循準備系統以進行數據科學教學課程中的指示,將筆記本匯入您的工作區。
如果您想要複製並貼上此頁面中的程式碼,您可以 建立新的筆記本。
開始執行程序代碼之前,請務必將 Lakehouse 附加至筆記本 。
步驟 1:載入數據
此案例中的書籍建議數據集包含三個不同的數據集:
Books.csv:國際標準書號(ISBN)會識別每本書,並移除無效的日期。 數據集也包含標題、作者和發行者。 對於具有多個作者的 書籍,Books.csv 檔案只會列出第一位作者。 URL 會以三種大小指向 Amazon 網站資源的封面影像。
Isbn 書名 Book-Author 發行年 發行者 Image-URL-S Image-URL-M Image-URL-l 0195153448 古典神話 馬克·奧·莫福德 2002 牛津大學出版社 http://images.amazon.com/images/P/0195153448.01.THUMBZZZ.jpg http://images.amazon.com/images/P/0195153448.01.MZZZZZZZ.jpg http://images.amazon.com/images/P/0195153448.01.LZZZZZZZ.jpg 0002005018 克拉拉·卡蘭 理查·布魯斯·賴特 2001 哈珀弗拉明戈加拿大 http://images.amazon.com/images/P/0002005018.01.THUMBZZZ.jpg http://images.amazon.com/images/P/0002005018.01.MZZZZZZZ.jpg http://images.amazon.com/images/P/0002005018.01.LZZZZZZZ.jpg Ratings.csv:每本書的評分是明確的(由使用者提供,以 1 到 10 的尺規提供)或隱含的 (觀察沒有使用者輸入,並以 0 表示)。
用戶標識碼 Isbn 書籍評等 276725 034545104X 0 276726 0155061224 5 Users.csv:使用者標識碼會匿名並對應至整數。 人口統計數據 - 例如位置與年齡 - 提供,如果有的話。 如果無法使用此資料,這些值為
null
。用戶標識碼 Location 年齡 1 “nyc 紐約美國” 2 “斯托克頓加州美國” 18.0
定義這些參數,讓您可以使用不同的數據集來建立此筆記本:
IS_CUSTOM_DATA = False # If True, the dataset has to be uploaded manually
USER_ID_COL = "User-ID" # Must not be '_user_id' for this notebook to run successfully
ITEM_ID_COL = "ISBN" # Must not be '_item_id' for this notebook to run successfully
ITEM_INFO_COL = (
"Book-Title" # Must not be '_item_info' for this notebook to run successfully
)
RATING_COL = (
"Book-Rating" # Must not be '_rating' for this notebook to run successfully
)
IS_SAMPLE = True # If True, use only <SAMPLE_ROWS> rows of data for training; otherwise, use all data
SAMPLE_ROWS = 5000 # If IS_SAMPLE is True, use only this number of rows for training
DATA_FOLDER = "Files/book-recommendation/" # Folder that contains the datasets
ITEMS_FILE = "Books.csv" # File that contains the item information
USERS_FILE = "Users.csv" # File that contains the user information
RATINGS_FILE = "Ratings.csv" # File that contains the rating information
EXPERIMENT_NAME = "aisample-recommendation" # MLflow experiment name
下載數據並將其儲存在 Lakehouse 中
此程式代碼會下載數據集,然後將它儲存在 Lakehouse 中。
重要
在執行筆記本之前, 請務必將 Lakehouse 新增至筆記本。 否則,您會收到錯誤。
if not IS_CUSTOM_DATA:
# Download data files into a lakehouse if they don't exist
import os, requests
remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/Book-Recommendation-Dataset"
file_list = ["Books.csv", "Ratings.csv", "Users.csv"]
download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError(
"Default lakehouse not found, please add a lakehouse and restart the session."
)
os.makedirs(download_path, exist_ok=True)
for fname in file_list:
if not os.path.exists(f"{download_path}/{fname}"):
r = requests.get(f"{remote_url}/{fname}", timeout=30)
with open(f"{download_path}/{fname}", "wb") as f:
f.write(r.content)
print("Downloaded demo data files into lakehouse.")
設定 MLflow 實驗追蹤
使用此程式代碼來設定 MLflow 實驗追蹤。 此範例會停用自動記錄。 如需詳細資訊,請參閱 Microsoft Fabric 中的自動記錄文章。
# Set up MLflow for experiment tracking
import mlflow
mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True) # Disable MLflow autologging
從 Lakehouse 讀取數據
將正確的數據放入 Lakehouse 之後,請將這三個數據集讀入筆記本中的個別 Spark DataFrame。 此程式代碼中的檔案路徑會使用稍早定義的參數。
df_items = (
spark.read.option("header", True)
.option("inferSchema", True)
.csv(f"{DATA_FOLDER}/raw/{ITEMS_FILE}")
.cache()
)
df_ratings = (
spark.read.option("header", True)
.option("inferSchema", True)
.csv(f"{DATA_FOLDER}/raw/{RATINGS_FILE}")
.cache()
)
df_users = (
spark.read.option("header", True)
.option("inferSchema", True)
.csv(f"{DATA_FOLDER}/raw/{USERS_FILE}")
.cache()
)
步驟 2:執行探勘數據分析
顯示原始數據
使用 display
命令探索 DataFrame。 使用此命令,您可以檢視高階 DataFrame 統計數據,並瞭解不同的數據集數據行彼此的關聯性。 探索資料集之前,請先使用此程式代碼匯入必要的連結庫:
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer
import matplotlib.pyplot as plt
import seaborn as sns
color = sns.color_palette() # Adjusting plotting style
import pandas as pd # DataFrames
使用此程式代碼來檢視包含書籍資料的 DataFrame:
display(df_items, summary=True)
新增數據行 _item_id
以供日後使用。 值 _item_id
必須是建議模型的整數。 此程式代碼會使用 StringIndexer
轉換成 ITEM_ID_COL
索引:
df_items = (
StringIndexer(inputCol=ITEM_ID_COL, outputCol="_item_id")
.setHandleInvalid("skip")
.fit(df_items)
.transform(df_items)
.withColumn("_item_id", F.col("_item_id").cast("int"))
)
顯示 DataFrame,並檢查值是否 _item_id
依預期以單調和連續方式增加:
display(df_items.sort(F.col("_item_id").desc()))
使用此程式代碼以遞減順序繪製前 10 位作者的書籍數目。 阿加莎·克裡斯蒂是擁有600多本書的主要作者,其次是威廉·莎士比亞。
df_books = df_items.toPandas() # Create a pandas DataFrame from the Spark DataFrame for visualization
plt.figure(figsize=(8,5))
sns.countplot(y="Book-Author",palette = 'Paired', data=df_books,order=df_books['Book-Author'].value_counts().index[0:10])
plt.title("Top 10 authors with maximum number of books")
接下來,顯示包含用戶數據的 DataFrame:
display(df_users, summary=True)
如果數據列有遺漏 User-ID
的值,請卸除該數據列。 自訂數據集中的遺漏值不會造成問題。
df_users = df_users.dropna(subset=(USER_ID_COL))
display(df_users, summary=True)
新增數據行 _user_id
以供稍後使用。 對於建議模型,此值 _user_id
必須是整數。 下列程式代碼範例會使用 StringIndexer
轉換成 USER_ID_COL
索引。
書籍數據集已經有整數 User-ID
數據行。 不過,新增 _user_id
數據行以與不同的數據集相容,讓此範例更加健全。 使用此程式代碼來新增資料 _user_id
列:
df_users = (
StringIndexer(inputCol=USER_ID_COL, outputCol="_user_id")
.setHandleInvalid("skip")
.fit(df_users)
.transform(df_users)
.withColumn("_user_id", F.col("_user_id").cast("int"))
)
display(df_users.sort(F.col("_user_id").desc()))
使用此程式代碼來檢視評等資料:
display(df_ratings, summary=True)
取得相異評等,並儲存它們以供稍後在名為 ratings
的清單中使用:
ratings = [i[0] for i in df_ratings.select(RATING_COL).distinct().collect()]
print(ratings)
使用此程式代碼來顯示評分最高的前 10 本書:
plt.figure(figsize=(8,5))
sns.countplot(y="Book-Title",palette = 'Paired',data= df_books, order=df_books['Book-Title'].value_counts().index[0:10])
plt.title("Top 10 books per number of ratings")
根據收視率, 精選詩 是最受歡迎的書。 赫克貝里芬恩的冒險, 秘密花園,德 庫拉 有相同的評級。
合併資料
將三個 DataFrame 合併成一個 DataFrame,以進行更全面的分析:
df_all = df_ratings.join(df_users, USER_ID_COL, "inner").join(
df_items, ITEM_ID_COL, "inner"
)
df_all_columns = [
c for c in df_all.columns if c not in ["_user_id", "_item_id", RATING_COL]
]
# Reorder the columns to ensure that _user_id, _item_id, and Book-Rating are the first three columns
df_all = (
df_all.select(["_user_id", "_item_id", RATING_COL] + df_all_columns)
.withColumn("id", F.monotonically_increasing_id())
.cache()
)
display(df_all)
使用此程式代碼來顯示相異使用者、書籍和互動的計數:
print(f"Total Users: {df_users.select('_user_id').distinct().count()}")
print(f"Total Items: {df_items.select('_item_id').distinct().count()}")
print(f"Total User-Item Interactions: {df_all.count()}")
計算及繪製最受歡迎的專案
使用此程式代碼來計算及顯示前 10 大熱門書籍:
# Compute top popular products
df_top_items = (
df_all.groupby(["_item_id"])
.count()
.join(df_items, "_item_id", "inner")
.sort(["count"], ascending=[0])
)
# Find top <topn> popular items
topn = 10
pd_top_items = df_top_items.limit(topn).toPandas()
pd_top_items.head(10)
提示
使用熱門<topn>
或熱門購買的建議區段的值。
# Plot top <topn> items
f, ax = plt.subplots(figsize=(10, 5))
plt.xticks(rotation="vertical")
sns.barplot(y=ITEM_INFO_COL, x="count", data=pd_top_items)
ax.tick_params(axis='x', rotation=45)
plt.xlabel("Number of Ratings for the Item")
plt.show()
準備定型和測試數據集
ALS 矩陣在定型之前需要一些數據準備。 使用此程式碼範例來準備數據。 程式代碼會執行下列動作:
- 將評等數據行轉換成正確的類型
- 使用用戶評等來取樣定型數據
- 將數據分割成定型和測試數據集
if IS_SAMPLE:
# Must sort by '_user_id' before performing limit to ensure that ALS works normally
# If training and test datasets have no common _user_id, ALS will fail
df_all = df_all.sort("_user_id").limit(SAMPLE_ROWS)
# Cast the column into the correct type
df_all = df_all.withColumn(RATING_COL, F.col(RATING_COL).cast("float"))
# Using a fraction between 0 and 1 returns the approximate size of the dataset; for example, 0.8 means 80% of the dataset
# Rating = 0 means the user didn't rate the item, so it can't be used for training
# We use the 80% of the dataset with rating > 0 as the training dataset
fractions_train = {0: 0}
fractions_test = {0: 0}
for i in ratings:
if i == 0:
continue
fractions_train[i] = 0.8
fractions_test[i] = 1
# Training dataset
train = df_all.sampleBy(RATING_COL, fractions=fractions_train)
# Join with leftanti will select all rows from df_all with rating > 0 and not in the training dataset; for example, the remaining 20% of the dataset
# test dataset
test = df_all.join(train, on="id", how="leftanti").sampleBy(
RATING_COL, fractions=fractions_test
)
疏鬆是指疏鬆的意見反應數據,無法識別用戶興趣的相似性。 若要進一步了解數據和目前的問題,請使用此程式代碼來計算數據集疏疏:
# Compute the sparsity of the dataset
def get_mat_sparsity(ratings):
# Count the total number of ratings in the dataset - used as numerator
count_nonzero = ratings.select(RATING_COL).count()
print(f"Number of rows: {count_nonzero}")
# Count the total number of distinct user_id and distinct product_id - used as denominator
total_elements = (
ratings.select("_user_id").distinct().count()
* ratings.select("_item_id").distinct().count()
)
# Calculate the sparsity by dividing the numerator by the denominator
sparsity = (1.0 - (count_nonzero * 1.0) / total_elements) * 100
print("The ratings DataFrame is ", "%.4f" % sparsity + "% sparse.")
get_mat_sparsity(df_all)
# Check the ID range
# ALS supports only values in the integer range
print(f"max user_id: {df_all.agg({'_user_id': 'max'}).collect()[0][0]}")
print(f"max user_id: {df_all.agg({'_item_id': 'max'}).collect()[0][0]}")
步驟 3:開發和定型模型
將 ALS 模型定型,以提供用戶個人化的建議。
定義模型
Spark ML 提供方便的 API 來建置 ALS 模型。 不過,模型無法可靠地處理數據疏疏和冷啟動等問題(當使用者或專案是新的時提出建議)。 若要改善模型效能,請結合交叉驗證和自動超參數微調。
使用此程式代碼匯入模型定型和評估所需的連結庫:
# Import Spark required libraries
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit
# Specify the training parameters
num_epochs = 1 # Number of epochs; here we use 1 to reduce the training time
rank_size_list = [64] # The values of rank in ALS for tuning
reg_param_list = [0.01, 0.1] # The values of regParam in ALS for tuning
model_tuning_method = "TrainValidationSplit" # TrainValidationSplit or CrossValidator
# Build the recommendation model by using ALS on the training data
# We set the cold start strategy to 'drop' to ensure that we don't get NaN evaluation metrics
als = ALS(
maxIter=num_epochs,
userCol="_user_id",
itemCol="_item_id",
ratingCol=RATING_COL,
coldStartStrategy="drop",
implicitPrefs=False,
nonnegative=True,
)
微調模型超參數
下一個程式代碼範例會建構參數方格,以協助搜尋超參數。 此程式代碼也會建立回歸評估工具,其使用根平均平方誤差 (RMSE) 作為評估計量:
# Construct a grid search to select the best values for the training parameters
param_grid = (
ParamGridBuilder()
.addGrid(als.rank, rank_size_list)
.addGrid(als.regParam, reg_param_list)
.build()
)
print("Number of models to be tested: ", len(param_grid))
# Define the evaluator and set the loss function to the RMSE
evaluator = RegressionEvaluator(
metricName="rmse", labelCol=RATING_COL, predictionCol="prediction"
)
下一個程式代碼範例會根據預先設定的參數起始不同的模型微調方法。 如需模型微調的詳細資訊,請參閱 ML 微調:Apache Spark 網站上的模型選取和超參數微調 。
# Build cross-validation by using CrossValidator and TrainValidationSplit
if model_tuning_method == "CrossValidator":
tuner = CrossValidator(
estimator=als,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=5,
collectSubModels=True,
)
elif model_tuning_method == "TrainValidationSplit":
tuner = TrainValidationSplit(
estimator=als,
estimatorParamMaps=param_grid,
evaluator=evaluator,
# 80% of the training data will be used for training; 20% for validation
trainRatio=0.8,
collectSubModels=True,
)
else:
raise ValueError(f"Unknown model_tuning_method: {model_tuning_method}")
評估模型
您應該根據測試數據評估模組。 定型良好的模型應該在數據集上具有高計量。
過度調整的模型可能需要增加定型數據的大小,或減少一些備援功能。 模型架構可能需要變更,或其參數可能需要一些微調。
注意
負 R 平方度量值表示定型的模型執行比水準直線更差。 這項發現表明已定型的模型不會解釋數據。
若要定義評估函式,請使用下列程式代碼:
def evaluate(model, data, verbose=0):
"""
Evaluate the model by computing rmse, mae, r2, and variance over the data.
"""
predictions = model.transform(data).withColumn(
"prediction", F.col("prediction").cast("double")
)
if verbose > 1:
# Show 10 predictions
predictions.select("_user_id", "_item_id", RATING_COL, "prediction").limit(
10
).show()
# Initialize the regression evaluator
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=RATING_COL)
_evaluator = lambda metric: evaluator.setMetricName(metric).evaluate(predictions)
rmse = _evaluator("rmse")
mae = _evaluator("mae")
r2 = _evaluator("r2")
var = _evaluator("var")
if verbose > 0:
print(f"RMSE score = {rmse}")
print(f"MAE score = {mae}")
print(f"R2 score = {r2}")
print(f"Explained variance = {var}")
return predictions, (rmse, mae, r2, var)
使用 MLflow 追蹤實驗
使用 MLflow 來追蹤所有實驗,以及記錄參數、計量和模型。 若要開始模型定型和評估,請使用下列程序代碼:
from mlflow.models.signature import infer_signature
with mlflow.start_run(run_name="als"):
# Train models
models = tuner.fit(train)
best_metrics = {"RMSE": 10e6, "MAE": 10e6, "R2": 0, "Explained variance": 0}
best_index = 0
# Evaluate models
# Log models, metrics, and parameters
for idx, model in enumerate(models.subModels):
with mlflow.start_run(nested=True, run_name=f"als_{idx}") as run:
print("\nEvaluating on test data:")
print(f"subModel No. {idx + 1}")
predictions, (rmse, mae, r2, var) = evaluate(model, test, verbose=1)
signature = infer_signature(
train.select(["_user_id", "_item_id"]),
predictions.select(["_user_id", "_item_id", "prediction"]),
)
print("log model:")
mlflow.spark.log_model(
model,
f"{EXPERIMENT_NAME}-alsmodel",
signature=signature,
registered_model_name=f"{EXPERIMENT_NAME}-alsmodel",
dfs_tmpdir="Files/spark",
)
print("log metrics:")
current_metric = {
"RMSE": rmse,
"MAE": mae,
"R2": r2,
"Explained variance": var,
}
mlflow.log_metrics(current_metric)
if rmse < best_metrics["RMSE"]:
best_metrics = current_metric
best_index = idx
print("log parameters:")
mlflow.log_params(
{
"subModel_idx": idx,
"num_epochs": num_epochs,
"rank_size_list": rank_size_list,
"reg_param_list": reg_param_list,
"model_tuning_method": model_tuning_method,
"DATA_FOLDER": DATA_FOLDER,
}
)
# Log the best model and related metrics and parameters to the parent run
mlflow.spark.log_model(
models.subModels[best_index],
f"{EXPERIMENT_NAME}-alsmodel",
signature=signature,
registered_model_name=f"{EXPERIMENT_NAME}-alsmodel",
dfs_tmpdir="Files/spark",
)
mlflow.log_metrics(best_metrics)
mlflow.log_params(
{
"subModel_idx": idx,
"num_epochs": num_epochs,
"rank_size_list": rank_size_list,
"reg_param_list": reg_param_list,
"model_tuning_method": model_tuning_method,
"DATA_FOLDER": DATA_FOLDER,
}
)
從工作區選取名為 aisample-recommendation
的實驗,以檢視定型回合的記錄資訊。 如果您變更實驗名稱,請選取具有新名稱的實驗。 記錄的資訊類似下圖:
步驟 4:載入最終模型以進行評分並進行預測
完成模型定型之後,然後選取最佳模型,載入模型以進行評分(有時稱為推斷)。 此程式代碼會載入模型,並使用預測來建議每個使用者的前 10 本書:
# Load the best model
# MLflow uses PipelineModel to wrap the original model, so we extract the original ALSModel from the stages
model_uri = f"models:/{EXPERIMENT_NAME}-alsmodel/1"
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir="Files/spark").stages[-1]
# Generate top 10 book recommendations for each user
userRecs = loaded_model.recommendForAllUsers(10)
# Represent the recommendations in an interpretable format
userRecs = (
userRecs.withColumn("rec_exp", F.explode("recommendations"))
.select("_user_id", F.col("rec_exp._item_id"), F.col("rec_exp.rating"))
.join(df_items.select(["_item_id", "Book-Title"]), on="_item_id")
)
userRecs.limit(10).show()
輸出類似下表:
_item_id | _user_id | 評等 | 書名 |
---|---|---|---|
44865 | 7 | 7.9996786 | 拉舍爾:生活... |
786 | 7 | 6.2255826 | 鋼琴俠的D... |
45330 | 7 | 4.980466 | 心態 |
38960 | 7 | 4.980466 | 他曾經想要的一切 |
125415 | 7 | 4.505084 | 哈利·波特和... |
44939 | 7 | 4.3579073 | 塔爾托斯:生活... |
175247 | 7 | 4.3579073 | 博內塞特的... |
170183 | 7 | 4.228735 | 生活簡單... |
88503 | 7 | 4.221206 | 布魯島... |
32894 | 7 | 3.9031885 | 冬至 |
將預測儲存到湖屋
使用此程式代碼將建議寫回 Lakehouse:
# Code to save userRecs into the lakehouse
userRecs.write.format("delta").mode("overwrite").save(
f"{DATA_FOLDER}/predictions/userRecs"
)
相關內容
意見反應
https://aka.ms/ContentUserFeedback。
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:提交並檢視相關的意見反應