Training e valutazione di un modello di classificazione del testo in Microsoft Fabric

In questo notebook viene illustrato come risolvere un'attività di classificazione del testo con il modello word2vec + linear-regression in Spark.

Importante

Microsoft Fabric è attualmente disponibile in ANTEPRIMA. Queste informazioni si riferiscono a un prodotto in versione preliminare che può essere modificato in modo sostanziale prima del rilascio. Microsoft non fornisce alcuna garanzia, espressa o implicita, rispetto alle informazioni fornite qui.

Il set di dati di esempio è costituito da metadati relativi ai libri digitalizzati dalla British Library in collaborazione con Microsoft. Include etichette generate dall'uomo per classificare un libro come 'fiction' o 'non-fiction'. Questo set di dati viene usato per eseguire il training di un modello per la classificazione dei generi che stima se un libro è 'fiction' o 'non-fiction' in base al titolo.

BL record ID Tipo di risorsa Nome Date associate al nome Tipo di nome Ruolo Tutti i nomi Titolo Titoli varianti Titolo della serie Numero all'interno della serie Paese di pubblicazione Luogo di pubblicazione Publisher Data di pubblicazione Edizione Descrizione fisica Classificazione dewey Indicatore di mensola BL Argomenti Genre Linguaggi Note ID record BL per la risorsa fisica classification_id user_id created_at subject_ids annotator_date_pub annotator_normalised_date_pub annotator_edition_statement annotator_genre annotator_FAST_genre_terms annotator_FAST_subject_terms annotator_comments annotator_main_language annotator_other_languages_summaries annotator_summaries_language annotator_translation annotator_original_language annotator_publisher annotator_place_pub annotator_country annotator_title Collegamento a libro digitalizzato Annotato
014602826 Monografia Yearsley, Ann 1753-1806 persona Più, Hannah, 1745-1833 [persona]; Yearsley, Ann, 1753-1806 [person] Poesie in diverse occasioni [Con una lettera prefatoria di Hannah More.] Inghilterra Londra 1786 Quarta edizione MANUSCRIPT nota Digital Store 11644.d.32 Inglese 003996603 Falso
014602830 Monografia A, T. persona Oldham, John, 1653-1683 [persona]; A, T. [person] Un Satyr contro Vertue. (Una poesia: doveva essere pronunciata da un Town-Hector [di John Oldham. Prefazione firmata: T. A.]) Inghilterra Londra 1679 15 pagine (4°) Digital Store 11602.ee.10. (2.) Inglese 000001143 Falso

Passaggio 1: Caricare i dati

Configurazioni dei notebook

Installare le librerie

In questo notebook è wordcloudnecessario installare prima di tutto . Il kernel PySpark verrà riavviato dopo %pip install, quindi è necessario installarlo prima di eseguire qualsiasi altra cella. Word Cloud è un pacchetto che consente di usare una tecnica di visualizzazione dei dati per rappresentare i dati di testo per indicare la frequenza in una determinata parte di testo.

# install wordcloud for text visualization
%pip install wordcloud

Definendo i parametri seguenti, è possibile applicare facilmente questo notebook a set di dati diversi.

IS_CUSTOM_DATA = False  # if True, dataset has to be uploaded manually by user
DATA_FOLDER = "Files/title-genre-classification"
DATA_FILE = "blbooksgenre.csv"

# data schema
TEXT_COL = "Title"
LABEL_COL = "annotator_genre"
LABELS = ["Fiction", "Non-fiction"]

EXPERIMENT_NAME = "aisample-textclassification"  # mlflow experiment name

Vengono definiti anche alcuni iper parametri per il training del modello. (NON modificare questi parametri a meno che non si sia consapevoli del significato).

# hyper-params
word2vec_size = 128
min_word_count = 3
max_iter = 10
k_folds = 3

Importare le dipendenze

import numpy as np
from itertools import chain

from wordcloud import WordCloud
import matplotlib.pyplot as plt
import seaborn as sns

import pyspark.sql.functions as F

from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import (
    BinaryClassificationEvaluator,
    MulticlassClassificationEvaluator,
)

from synapse.ml.stages import ClassBalancer
from synapse.ml.train import ComputeModelStatistics

import mlflow

Scaricare il set di dati e caricare in Lakehouse

Aggiungere un lakehouse al notebook prima di eseguirlo.

if not IS_CUSTOM_DATA:
    # Download demo data files into lakehouse if not exist
    import os, requests

    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/Title_Genre_Classification"
    fname = "blbooksgenre.csv"
    download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"

    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError(
            "Default lakehouse not found, please add a lakehouse and restart the session."
        )
    os.makedirs(download_path, exist_ok=True)
    if not os.path.exists(f"{download_path}/{fname}"):
        r = requests.get(f"{remote_url}/{fname}", timeout=30)
        with open(f"{download_path}/{fname}", "wb") as f:
            f.write(r.content)
    print("Downloaded demo data files into lakehouse.")

Leggere i dati da Lakehouse

raw_df = spark.read.csv(f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True)

display(raw_df.limit(20))

Passaggio 2: Pre-elaborare i dati

Pulizia dei dati

df = (
    raw_df.select([TEXT_COL, LABEL_COL])
    .where(F.col(LABEL_COL).isin(LABELS))
    .dropDuplicates([TEXT_COL])
    .cache()
)

display(df.limit(20))

Gestire i dati non bilanciati

cb = ClassBalancer().setInputCol(LABEL_COL)

df = cb.fit(df).transform(df)
display(df.limit(20))

Tokenize

## text transformer
tokenizer = Tokenizer(inputCol=TEXT_COL, outputCol="tokens")
stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")

## build the pipeline
pipeline = Pipeline(stages=[tokenizer, stopwords_remover])

token_df = pipeline.fit(df).transform(df)

display(token_df.limit(20))

Visualizzazione

Visualizzare un oggetto Wordcloud per ogni classe.

# WordCloud
for label in LABELS:
    tokens = (
        token_df.where(F.col(LABEL_COL) == label)
        .select(F.explode("filtered_tokens").alias("token"))
        .where(F.col("token").rlike(r"^\w+$"))
    )

    top50_tokens = (
        tokens.groupBy("token").count().orderBy(F.desc("count")).limit(50).collect()
    )

    # Generate a word cloud image
    wordcloud = WordCloud(
        scale=10,
        background_color="white",
        random_state=42,  # Make sure the output is always the same for the same input
    ).generate_from_frequencies(dict(top50_tokens))

    # Display the generated image the matplotlib way:
    plt.figure(figsize=(10, 10))
    plt.title(label, fontsize=20)
    plt.axis("off")
    plt.imshow(wordcloud, interpolation="bilinear")

Vettorizza

Usiamo word2vec per vettorizzare il testo.

## label transformer
label_indexer = StringIndexer(inputCol=LABEL_COL, outputCol="labelIdx")
vectorizer = Word2Vec(
    vectorSize=word2vec_size,
    minCount=min_word_count,
    inputCol="filtered_tokens",
    outputCol="features",
)

## build the pipeline
pipeline = Pipeline(stages=[label_indexer, vectorizer])
vec_df = (
    pipeline.fit(token_df)
    .transform(token_df)
    .select([TEXT_COL, LABEL_COL, "features", "labelIdx", "weight"])
)

display(vec_df.limit(20))

Passaggio 3: Training e valutazione del modello

Il set di dati è stato pulito, gestito con dati sbilanciati, tokenizzato il testo, la nuvola di parole visualizzata e il testo vettorializzato.

Successivamente, viene eseguito il training di un modello di regressione lineare per classificare il testo vettorializzato.

Dividere il set di dati in training e test

(train_df, test_df) = vec_df.randomSplit((0.8, 0.2), seed=42)

Creare il modello

lr = (
    LogisticRegression()
    .setMaxIter(max_iter)
    .setFeaturesCol("features")
    .setLabelCol("labelIdx")
    .setWeightCol("weight")
)

Eseguire il training del modello con la convalida incrociata

param_grid = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.03, 0.1, 0.3])
    .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2])
    .build()
)

if len(LABELS) > 2:
    evaluator_cls = MulticlassClassificationEvaluator
    evaluator_metrics = ["f1", "accuracy"]
else:
    evaluator_cls = BinaryClassificationEvaluator
    evaluator_metrics = ["areaUnderROC", "areaUnderPR"]
evaluator = evaluator_cls(labelCol="labelIdx", weightCol="weight")

crossval = CrossValidator(
    estimator=lr, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=k_folds
)

model = crossval.fit(train_df)

Valutare il modello

predictions = model.transform(test_df)

display(predictions)
log_metrics = {}
for metric in evaluator_metrics:
    value = evaluator.evaluate(predictions, {evaluator.metricName: metric})
    log_metrics[metric] = value
    print(f"{metric}: {value:.4f}")
metrics = ComputeModelStatistics(
    evaluationMetric="classification", labelCol="labelIdx", scoredLabelsCol="prediction"
).transform(predictions)
display(metrics)
# collect confusion matrix value
cm = metrics.select("confusion_matrix").collect()[0][0].toArray()
print(cm)

# plot confusion matrix
sns.set(rc={"figure.figsize": (6, 4.5)})
ax = sns.heatmap(cm, annot=True, fmt=".20g")
ax.set_title("Confusion Matrix")
ax.set_xlabel("Predicted label")
ax.set_ylabel("True label")

Registrare e caricare il modello con MLflow

Ora che è disponibile un modello con cui si è soddisfatti, è possibile salvarlo per usarlo in un secondo momento. In questo caso si usa MLflow per registrare metriche/modelli e caricare i modelli per la stima.

# setup mlflow
mlflow.set_experiment(EXPERIMENT_NAME)
# log model, metrics and params
with mlflow.start_run() as run:
    print("log model:")
    mlflow.spark.log_model(
        model,
        f"{EXPERIMENT_NAME}-lrmodel",
        registered_model_name=f"{EXPERIMENT_NAME}-lrmodel",
        dfs_tmpdir="Files/spark",
    )

    print("log metrics:")
    mlflow.log_metrics(log_metrics)

    print("log parameters:")
    mlflow.log_params(
        {
            "word2vec_size": word2vec_size,
            "min_word_count": min_word_count,
            "max_iter": max_iter,
            "k_folds": k_folds,
            "DATA_FILE": DATA_FILE,
        }
    )

    model_uri = f"runs:/{run.info.run_id}/{EXPERIMENT_NAME}-lrmodel"
    print("Model saved in run %s" % run.info.run_id)
    print(f"Model URI: {model_uri}")
# load model back
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir="Files/spark")

# verify loaded model
predictions = loaded_model.transform(test_df)
display(predictions)