共用方式為


教學課程:建立、評估文字分類模型及評分

本教學課程會在 Microsoft Fabric 中,針對文字分類模型呈現 Synapse 資料科學工作流程的端對端範例。 該情境結合 Word2vec 自然語言處理(NLP)及邏輯迴歸,在 Spark 上用來判斷英國圖書館書籍資料集中書籍的類型。 這個判斷完全取決於書名。

本教學課程涵蓋了下列步驟:

  • 安裝自訂程式庫
  • 載入資料
  • 使用探索式資料分析來了解和處理資料
  • 用 Word2vec NLP 和邏輯迴歸訓練機器學習模型,並使用 MLflow 和 Fabric 自動記錄功能追蹤實驗
  • 載入機器學習模型以進行評分和預測

必要條件

遵循筆記本中的指示

要在筆記本中進行操作,你有以下選擇:

  • 開啟並執行內建筆記本。
  • 從 GitHub 上傳您的筆記本。

開啟內建筆記本

本教學課程隨附範例標題內容類型分類筆記本。

  1. 若要開啟本次教學課程的範例筆記本,請遵循 準備系統以進行資料科學教學的指示

  2. 開始執行程序代碼之前,請務必 將 lakehouse 附加至筆記本

從 GitHub 匯入筆記本

AIsample - 標題內容類型 Classification.ipynb 是本教學課程隨附的筆記本。

步驟 1:安裝自訂程式庫

針對機器學習模型開發或臨機操作資料分析,您可能需要快速安裝 Apache Spark 工作階段的自訂程式庫。 你有兩種安裝函式庫的選擇。

  • 要只在你目前的筆記本上安裝函式庫,請使用筆記本的內嵌安裝功能(%pip%conda)。
  • 作為替代方案,你可以建立 Fabric 環境,從公開來源安裝函式庫,或上傳自訂函式庫。 接著,你的工作區管理員可以將該環境附加為工作區的預設。 此時,環境中所有函式庫即可在該工作區的所有筆記本及 Spark 工作定義中使用。 欲了解更多環境資訊,請造訪 在 Microsoft Fabric 中建立、配置及使用環境 資源。

在分類模型中,使用 wordcloud 函式庫來表示文字中的詞頻。 在資源中 wordcloud ,單字的大小代表其頻率。 在本教學課程中,使用 %pip install 在您的筆記本中安裝 wordcloud

注意

執行 %pip install 之後,PySpark 核心會重新啟動。 在運行其他儲存區之前,先安裝你需要的函式庫。

# Install wordcloud for text visualization by using pip
%pip install wordcloud

步驟 2:載入資料

英國圖書館書籍資料集包含關於大英圖書館書籍的元資料。 圖書館與 Microsoft 合作將原始資源數位化,成為該資料集。 元資料是分類資訊,用來判斷一本書是小說還是非小說。 下圖顯示資料集的列樣本。

BL 記錄 ID 資源的類型 名稱 與名稱關聯的資料 名稱類型 角色 所有名稱 標題 變化標題 系列標題 系列中的編號 出版物國家/地區 出版物的類型 發行者 出版物的日期 版本(Edition) 實體描述 Dewey 分類 BL 貨架標記 主題 Genre 語言 備註 實體資源的 BL 記錄 ID classification_id user_id created_at subject_ids annotator_date_pub annotator_normalised_date_pub annotator_edition_statement annotator_genre annotator_FAST_genre_terms annotator_FAST_subject_terms annotator_comments annotator_main_language annotator_other_languages_summaries annotator_summaries_language annotator_translation annotator_original_language annotator_publisher annotator_place_pub annotator_country annotator_title 連結至數位化書籍 已標註
014602826 專著 Yearsley、Ann 1753-1806 person 漢娜·莫爾,1745-1833 [人物];耶斯利,安,1753-1806 [人物] 多重場合的詩篇(附上漢娜·莫爾的序言信) 英格蘭 London 1786 第四版手稿筆記 數位商店 11644.d.32 英語 003996603 False
014602830 專著 A, T。 person 奧爾德姆,約翰,1653-1683 [人物];A, T. [人物] 一個薩堤爾對抗維爾圖。 (一首詩:據說是由一位 Town-Hector 朗誦的詩[約翰·奧爾德姆所作。序言署名:T. A.]) 英格蘭 London 1679 15頁(4°) 數位商店 11602.ee.10。 (2.) 英語 000001143 False

利用這個資料集,我們的目標是訓練一個僅根據書名來判斷書籍類型的分類模型。

請定義以下參數,以便將此筆記本應用於不同資料集:

IS_CUSTOM_DATA = False  # If True, the user must manually upload the dataset
DATA_FOLDER = "Files/title-genre-classification"
DATA_FILE = "blbooksgenre.csv"

# Data schema
TEXT_COL = "Title"
LABEL_COL = "annotator_genre"
LABELS = ["Fiction", "Non-fiction"]

EXPERIMENT_NAME = "sample-aisample-textclassification"  # MLflow experiment name

下載資料集並上傳至 Lakehouse

以下程式碼片段下載公開版本的資料集,並將其儲存在 Fabric 湖屋中:

重要

在執行筆記本之前,新增 Lakehouse 至筆記本。 未做到則會發生錯誤。

if not IS_CUSTOM_DATA:
    # Download demo data files into the lakehouse, if they don't exist
    import os, requests

    remote_url = "https://synapseaisolutionsa.z13.web.core.windows.net/data/Title_Genre_Classification"
    fname = "blbooksgenre.csv"
    download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        # Add a lakehouse, if no default lakehouse was added to the notebook
        # A new notebook won't link to any lakehouse by default
        raise FileNotFoundError(
            "Default lakehouse not found, please add a lakehouse and restart the session."
        )
    os.makedirs(download_path, exist_ok=True)
    if not os.path.exists(f"{download_path}/{fname}"):
        r = requests.get(f"{remote_url}/{fname}", timeout=30)
        with open(f"{download_path}/{fname}", "wb") as f:
            f.write(r.content)
    print("Downloaded demo data files into lakehouse.")

匯入必要的程式庫

在任何處理前,您必須匯入所需的函式庫,包括 SparkSynapseML 的函式庫:

import numpy as np
from itertools import chain

from wordcloud import WordCloud
import matplotlib.pyplot as plt
import seaborn as sns

import pyspark.sql.functions as F

from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import (
    BinaryClassificationEvaluator,
    MulticlassClassificationEvaluator,
)

from synapse.ml.stages import ClassBalancer
from synapse.ml.train import ComputeModelStatistics

import mlflow

調整超參數

以下程式碼片段定義了模型訓練所需的超參數:

重要

只有在您了解每個參數後,才可修改這些超參數。

# Hyperparameters 
word2vec_size = 128  # The length of the vector for each word
min_word_count = 3  # The minimum number of times that a word must appear to be considered
max_iter = 10  # The maximum number of training iterations
k_folds = 3  # The number of folds for cross-validation

開始記錄執行此筆記本所需的時間:

# Record the notebook running time
import time

ts = time.time()

設定 MLflow 實驗追蹤

自動記錄可擴充 MLflow 記錄功能。 自動記錄會在訓練時,自動擷取機器學習模型的輸入參數值和輸出計量。 接著,您會將此資訊記錄到工作區。 在工作區中,您可以使用工作區中的 MLflow API 或對應的實驗來存取和視覺化資訊。 欲了解更多關於自動記錄的資訊,請造訪 Microsoft Fabric 中的自動記錄 資源。

若要停用筆記本工作階段的 Microsoft Fabric 自動記錄,請呼叫 mlflow.autolog() 並設定 disable=True

# Set up Mlflow for experiment tracking

mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True)  # Disable Mlflow autologging

從 Lakehouse 讀取原始日期資料

raw_df = spark.read.csv(f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True)

步驟 3:執行探索式資料分析

使用 display 命令來探索資料集,以檢視資料集的高層級統計資料,以及顯示圖表檢視:

display(raw_df.limit(20))

準備資料

要清理資料,請移除重複的部分:

df = (
    raw_df.select([TEXT_COL, LABEL_COL])
    .where(F.col(LABEL_COL).isin(LABELS))
    .dropDuplicates([TEXT_COL])
    .cache()
)

display(df.limit(20))

套用類別平衡以解決任何偏差:

# Create a ClassBalancer instance, and set the input column to LABEL_COL
cb = ClassBalancer().setInputCol(LABEL_COL)

# Fit the ClassBalancer instance to the input DataFrame, and transform the DataFrame
df = cb.fit(df).transform(df)

# Display the first 20 rows of the transformed DataFrame
display(df.limit(20))

為了標記資料集,將段落和句子拆分成較小的單元。 如此一來,指派意義會變得更容易。 接著,移除停止字以提升表現。 停用字詞移除涉及移除通常發生在語料庫所有文件中的字組。 停用字詞移除是自然語言處理 (NLP) 應用程式中最常使用的前置處理步驟之一。 以下程式碼摘要涵蓋這些步驟:

# Text transformer
tokenizer = Tokenizer(inputCol=TEXT_COL, outputCol="tokens")
stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")

# Build the pipeline
pipeline = Pipeline(stages=[tokenizer, stopwords_remover])

token_df = pipeline.fit(df).transform(df)

display(token_df.limit(20))

顯示每個類別的 wordcloud 程式庫。 詞雲庫呈現經常出現在文字資料中的關鍵字,是一種視覺上非常醒目的呈現方式。 字雲庫之所以有效,是因為關鍵字渲染呈現出如雲朵般的色彩畫面,能更好地一目了然地捕捉主要文字資料。 欲了解更多關於 wordcloud 的資訊,請造訪 此資源

以下程式碼摘要涵蓋這些步驟:

# WordCloud
for label in LABELS:
    tokens = (
        token_df.where(F.col(LABEL_COL) == label)
        .select(F.explode("filtered_tokens").alias("token"))
        .where(F.col("token").rlike(r"^\w+$"))
    )

    top50_tokens = (
        tokens.groupBy("token").count().orderBy(F.desc("count")).limit(50).collect()
    )

    # Generate a wordcloud image
    wordcloud = WordCloud(
        scale=10,
        background_color="white",
        random_state=42,  # Make sure the output is always the same for the same input
    ).generate_from_frequencies(dict(top50_tokens))

    # Display the generated image by using matplotlib
    plt.figure(figsize=(10, 10))
    plt.title(label, fontsize=20)
    plt.axis("off")
    plt.imshow(wordcloud, interpolation="bilinear")

最後,使用 Word2vec NLP 將文字向量化。 Word2vec NLP 技術為文本中的每個字建立向量表示。 在類似內容或具有語意關聯性的文字中,透過向量空間中的接近度來有效地擷取。 此接近度表示類似的字詞具有類似的字詞向量。 以下程式碼摘要涵蓋這些步驟:

# Label transformer
label_indexer = StringIndexer(inputCol=LABEL_COL, outputCol="labelIdx")
vectorizer = Word2Vec(
    vectorSize=word2vec_size,
    minCount=min_word_count,
    inputCol="filtered_tokens",
    outputCol="features",
)

# Build the pipeline
pipeline = Pipeline(stages=[label_indexer, vectorizer])
vec_df = (
    pipeline.fit(token_df)
    .transform(token_df)
    .select([TEXT_COL, LABEL_COL, "features", "labelIdx", "weight"])
)

display(vec_df.limit(20))

步驟 4︰訓練及評估模型

設置資料後,定義模型。 在本節中,您將訓練羅吉斯迴歸模型,以對向量化文字分類。

準備訓練與測試的資料集

以下程式碼片段將資料集拆分:

# Split the dataset into training and testing
(train_df, test_df) = vec_df.randomSplit((0.8, 0.2), seed=42)

追蹤機器學習實驗

機器學習實驗追蹤管理所有實驗及其組成部分——例如參數、指標、模型及其他產物。 追蹤使得組織和管理特定機器學習實驗所需的所有元件成為可能。 它還可讓您使用儲存的實驗,輕鬆重現過去的結果。 欲了解更多資訊,請造訪 Microsoft Fabric 中的機器學習實驗

機器學習實驗是所有相關機器學習執行的組織和控制主要單位。 執行會對應至模型程式碼的單一執行。 以下程式碼摘要涵蓋這些步驟:

# Build the logistic regression classifier
lr = (
    LogisticRegression()
    .setMaxIter(max_iter)
    .setFeaturesCol("features")
    .setLabelCol("labelIdx")
    .setWeightCol("weight")
)

調整超參數

建置參數方格以搜尋超參數。 接著建立交叉評估器估計器,產生 CrossValidator 模型,如以下程式碼片段所示:

# Build a grid search to select the best values for the training parameters
param_grid = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.03, 0.1])
    .addGrid(lr.elasticNetParam, [0.0, 0.1])
    .build()
)

if len(LABELS) > 2:
    evaluator_cls = MulticlassClassificationEvaluator
    evaluator_metrics = ["f1", "accuracy"]
else:
    evaluator_cls = BinaryClassificationEvaluator
    evaluator_metrics = ["areaUnderROC", "areaUnderPR"]
evaluator = evaluator_cls(labelCol="labelIdx", weightCol="weight")

# Build a cross-evaluator estimator
crossval = CrossValidator(
    estimator=lr,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=k_folds,
    collectSubModels=True,
)

評估模型

我們可以評估測試資料集上的模型,以對其進行比較。 一個訓練良好的模型在與驗證及測試資料集對比時,應在相關指標上展現高效能。 以下程式碼摘要涵蓋這些步驟:

def evaluate(model, df):
    log_metric = {}
    prediction = model.transform(df)
    for metric in evaluator_metrics:
        value = evaluator.evaluate(prediction, {evaluator.metricName: metric})
        log_metric[metric] = value
        print(f"{metric}: {value:.4f}")
    return prediction, log_metric

使用 MLflow 追蹤實驗

啟動訓練和評估程序。 使用 MLflow 追蹤所有實驗,並記錄參數、指標和模型。 在工作區中,所有這些資訊都以實驗名稱記錄。 以下程式碼摘要涵蓋這些步驟:

with mlflow.start_run(run_name="lr"):
    models = crossval.fit(train_df)
    best_metrics = {k: 0 for k in evaluator_metrics}
    best_index = 0
    for idx, model in enumerate(models.subModels[0]):
        with mlflow.start_run(nested=True, run_name=f"lr_{idx}") as run:
            print("\nEvaluating on test data:")
            print(f"subModel No. {idx + 1}")
            prediction, log_metric = evaluate(model, test_df)

            if log_metric[evaluator_metrics[0]] > best_metrics[evaluator_metrics[0]]:
                best_metrics = log_metric
                best_index = idx

            print("log model")
            mlflow.spark.log_model(
                model,
                f"{EXPERIMENT_NAME}-lrmodel",
                registered_model_name=f"{EXPERIMENT_NAME}-lrmodel",
                dfs_tmpdir="Files/spark",
            )

            print("log metrics")
            mlflow.log_metrics(log_metric)

            print("log parameters")
            mlflow.log_params(
                {
                    "word2vec_size": word2vec_size,
                    "min_word_count": min_word_count,
                    "max_iter": max_iter,
                    "k_folds": k_folds,
                    "DATA_FILE": DATA_FILE,
                }
            )

    # Log the best model and its relevant metrics and parameters to the parent run
    mlflow.spark.log_model(
        models.subModels[0][best_index],
        f"{EXPERIMENT_NAME}-lrmodel",
        registered_model_name=f"{EXPERIMENT_NAME}-lrmodel",
        dfs_tmpdir="Files/spark",
    )
    mlflow.log_metrics(best_metrics)
    mlflow.log_params(
        {
            "word2vec_size": word2vec_size,
            "min_word_count": min_word_count,
            "max_iter": max_iter,
            "k_folds": k_folds,
            "DATA_FILE": DATA_FILE,
        }
    )

要檢視您的實驗:

  1. 在左側導覽中,選取您的工作區
  2. 尋找並選取實驗名稱 - 在此案例中為 sample_aisample-textclassification

實驗的螢幕擷取畫面。

步驟 5:評分並儲存預測結果

Microsoft Fabric 允許使用者將機器學習模型運用化,並具備可擴展 PREDICT 功能。 此函數支援在任何計算引擎進行批次評分 (或批次推斷)。 你可以直接從筆記本或特定模型的項目頁面建立批次預測。 欲了解更多關於此功能 PREDICT 及如何在 Fabric 中使用的資訊,請造訪 Microsoft Fabric 中的 PREDICT 機器學習模型評分

根據我們的評估結果,模型1在 Precision-Recall 曲線下面積(AUPRC)及曲線下面積接收工作特徵(AUC-ROC)方面均擁有最大的指標。 因此,您應使用模型 1 進行預測。

AUC-ROC 量值通常用於度量二進位分類器效能。 不過,根據 AUPRC 度量來評估分類器有時會更合適。 AUC-ROC 圖表會視覺化確判率 (TPR) 與誤判率 (FPR) 之間的取捨。 AUPRC曲線結合了精確度(陽性預測值,PPV)與回憶率(真陽性率,TPR)於單一視覺化中。 以下程式碼摘要涵蓋這些步驟:

# Load the best model
model_uri = f"models:/{EXPERIMENT_NAME}-lrmodel/1"
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir="Files/spark")

# Verify the loaded model
batch_predictions = loaded_model.transform(test_df)
batch_predictions.show(5)
# Code to save userRecs in the lakehouse
batch_predictions.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions"
)
# Determine the entire runtime
print(f"Full run cost {int(time.time() - ts)} seconds.")