Training e valutazione di un modello di classificazione del testo in Microsoft Fabric
In questo notebook viene illustrato come risolvere un'attività di classificazione del testo con il modello word2vec + linear-regression in Spark.
Importante
Microsoft Fabric è attualmente disponibile in ANTEPRIMA. Queste informazioni si riferiscono a un prodotto in versione preliminare che può essere modificato in modo sostanziale prima del rilascio. Microsoft non fornisce alcuna garanzia, espressa o implicita, rispetto alle informazioni fornite qui.
Il set di dati di esempio è costituito da metadati relativi ai libri digitalizzati dalla British Library in collaborazione con Microsoft. Include etichette generate dall'uomo per classificare un libro come 'fiction' o 'non-fiction'. Questo set di dati viene usato per eseguire il training di un modello per la classificazione dei generi che stima se un libro è 'fiction' o 'non-fiction' in base al titolo.
BL record ID | Tipo di risorsa | Nome | Date associate al nome | Tipo di nome | Ruolo | Tutti i nomi | Titolo | Titoli varianti | Titolo della serie | Numero all'interno della serie | Paese di pubblicazione | Luogo di pubblicazione | Publisher | Data di pubblicazione | Edizione | Descrizione fisica | Classificazione dewey | Indicatore di mensola BL | Argomenti | Genre | Linguaggi | Note | ID record BL per la risorsa fisica | classification_id | user_id | created_at | subject_ids | annotator_date_pub | annotator_normalised_date_pub | annotator_edition_statement | annotator_genre | annotator_FAST_genre_terms | annotator_FAST_subject_terms | annotator_comments | annotator_main_language | annotator_other_languages_summaries | annotator_summaries_language | annotator_translation | annotator_original_language | annotator_publisher | annotator_place_pub | annotator_country | annotator_title | Collegamento a libro digitalizzato | Annotato |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
014602826 | Monografia | Yearsley, Ann | 1753-1806 | persona | Più, Hannah, 1745-1833 [persona]; Yearsley, Ann, 1753-1806 [person] | Poesie in diverse occasioni [Con una lettera prefatoria di Hannah More.] | Inghilterra | Londra | 1786 | Quarta edizione MANUSCRIPT nota | Digital Store 11644.d.32 | Inglese | 003996603 | Falso | |||||||||||||||||||||||||||||||
014602830 | Monografia | A, T. | persona | Oldham, John, 1653-1683 [persona]; A, T. [person] | Un Satyr contro Vertue. (Una poesia: doveva essere pronunciata da un Town-Hector [di John Oldham. Prefazione firmata: T. A.]) | Inghilterra | Londra | 1679 | 15 pagine (4°) | Digital Store 11602.ee.10. (2.) | Inglese | 000001143 | Falso |
Passaggio 1: Caricare i dati
Configurazioni dei notebook
Installare le librerie
In questo notebook è wordcloud
necessario installare prima di tutto . Il kernel PySpark verrà riavviato dopo %pip install
, quindi è necessario installarlo prima di eseguire qualsiasi altra cella. Word Cloud è un pacchetto che consente di usare una tecnica di visualizzazione dei dati per rappresentare i dati di testo per indicare la frequenza in una determinata parte di testo.
# install wordcloud for text visualization
%pip install wordcloud
Definendo i parametri seguenti, è possibile applicare facilmente questo notebook a set di dati diversi.
IS_CUSTOM_DATA = False # if True, dataset has to be uploaded manually by user
DATA_FOLDER = "Files/title-genre-classification"
DATA_FILE = "blbooksgenre.csv"
# data schema
TEXT_COL = "Title"
LABEL_COL = "annotator_genre"
LABELS = ["Fiction", "Non-fiction"]
EXPERIMENT_NAME = "aisample-textclassification" # mlflow experiment name
Vengono definiti anche alcuni iper parametri per il training del modello. (NON modificare questi parametri a meno che non si sia consapevoli del significato).
# hyper-params
word2vec_size = 128
min_word_count = 3
max_iter = 10
k_folds = 3
Importare le dipendenze
import numpy as np
from itertools import chain
from wordcloud import WordCloud
import matplotlib.pyplot as plt
import seaborn as sns
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import (
BinaryClassificationEvaluator,
MulticlassClassificationEvaluator,
)
from synapse.ml.stages import ClassBalancer
from synapse.ml.train import ComputeModelStatistics
import mlflow
Scaricare il set di dati e caricare in Lakehouse
Aggiungere un lakehouse al notebook prima di eseguirlo.
if not IS_CUSTOM_DATA:
# Download demo data files into lakehouse if not exist
import os, requests
remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/Title_Genre_Classification"
fname = "blbooksgenre.csv"
download_path = f"/lakehouse/default/{DATA_FOLDER}/raw"
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError(
"Default lakehouse not found, please add a lakehouse and restart the session."
)
os.makedirs(download_path, exist_ok=True)
if not os.path.exists(f"{download_path}/{fname}"):
r = requests.get(f"{remote_url}/{fname}", timeout=30)
with open(f"{download_path}/{fname}", "wb") as f:
f.write(r.content)
print("Downloaded demo data files into lakehouse.")
Leggere i dati da Lakehouse
raw_df = spark.read.csv(f"{DATA_FOLDER}/raw/{DATA_FILE}", header=True, inferSchema=True)
display(raw_df.limit(20))
Passaggio 2: Pre-elaborare i dati
Pulizia dei dati
df = (
raw_df.select([TEXT_COL, LABEL_COL])
.where(F.col(LABEL_COL).isin(LABELS))
.dropDuplicates([TEXT_COL])
.cache()
)
display(df.limit(20))
Gestire i dati non bilanciati
cb = ClassBalancer().setInputCol(LABEL_COL)
df = cb.fit(df).transform(df)
display(df.limit(20))
Tokenize
## text transformer
tokenizer = Tokenizer(inputCol=TEXT_COL, outputCol="tokens")
stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
## build the pipeline
pipeline = Pipeline(stages=[tokenizer, stopwords_remover])
token_df = pipeline.fit(df).transform(df)
display(token_df.limit(20))
Visualizzazione
Visualizzare un oggetto Wordcloud per ogni classe.
# WordCloud
for label in LABELS:
tokens = (
token_df.where(F.col(LABEL_COL) == label)
.select(F.explode("filtered_tokens").alias("token"))
.where(F.col("token").rlike(r"^\w+$"))
)
top50_tokens = (
tokens.groupBy("token").count().orderBy(F.desc("count")).limit(50).collect()
)
# Generate a word cloud image
wordcloud = WordCloud(
scale=10,
background_color="white",
random_state=42, # Make sure the output is always the same for the same input
).generate_from_frequencies(dict(top50_tokens))
# Display the generated image the matplotlib way:
plt.figure(figsize=(10, 10))
plt.title(label, fontsize=20)
plt.axis("off")
plt.imshow(wordcloud, interpolation="bilinear")
Vettorizza
Usiamo word2vec per vettorizzare il testo.
## label transformer
label_indexer = StringIndexer(inputCol=LABEL_COL, outputCol="labelIdx")
vectorizer = Word2Vec(
vectorSize=word2vec_size,
minCount=min_word_count,
inputCol="filtered_tokens",
outputCol="features",
)
## build the pipeline
pipeline = Pipeline(stages=[label_indexer, vectorizer])
vec_df = (
pipeline.fit(token_df)
.transform(token_df)
.select([TEXT_COL, LABEL_COL, "features", "labelIdx", "weight"])
)
display(vec_df.limit(20))
Passaggio 3: Training e valutazione del modello
Il set di dati è stato pulito, gestito con dati sbilanciati, tokenizzato il testo, la nuvola di parole visualizzata e il testo vettorializzato.
Successivamente, viene eseguito il training di un modello di regressione lineare per classificare il testo vettorializzato.
Dividere il set di dati in training e test
(train_df, test_df) = vec_df.randomSplit((0.8, 0.2), seed=42)
Creare il modello
lr = (
LogisticRegression()
.setMaxIter(max_iter)
.setFeaturesCol("features")
.setLabelCol("labelIdx")
.setWeightCol("weight")
)
Eseguire il training del modello con la convalida incrociata
param_grid = (
ParamGridBuilder()
.addGrid(lr.regParam, [0.03, 0.1, 0.3])
.addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2])
.build()
)
if len(LABELS) > 2:
evaluator_cls = MulticlassClassificationEvaluator
evaluator_metrics = ["f1", "accuracy"]
else:
evaluator_cls = BinaryClassificationEvaluator
evaluator_metrics = ["areaUnderROC", "areaUnderPR"]
evaluator = evaluator_cls(labelCol="labelIdx", weightCol="weight")
crossval = CrossValidator(
estimator=lr, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=k_folds
)
model = crossval.fit(train_df)
Valutare il modello
predictions = model.transform(test_df)
display(predictions)
log_metrics = {}
for metric in evaluator_metrics:
value = evaluator.evaluate(predictions, {evaluator.metricName: metric})
log_metrics[metric] = value
print(f"{metric}: {value:.4f}")
metrics = ComputeModelStatistics(
evaluationMetric="classification", labelCol="labelIdx", scoredLabelsCol="prediction"
).transform(predictions)
display(metrics)
# collect confusion matrix value
cm = metrics.select("confusion_matrix").collect()[0][0].toArray()
print(cm)
# plot confusion matrix
sns.set(rc={"figure.figsize": (6, 4.5)})
ax = sns.heatmap(cm, annot=True, fmt=".20g")
ax.set_title("Confusion Matrix")
ax.set_xlabel("Predicted label")
ax.set_ylabel("True label")
Registrare e caricare il modello con MLflow
Ora che è disponibile un modello con cui si è soddisfatti, è possibile salvarlo per usarlo in un secondo momento. In questo caso si usa MLflow per registrare metriche/modelli e caricare i modelli per la stima.
# setup mlflow
mlflow.set_experiment(EXPERIMENT_NAME)
# log model, metrics and params
with mlflow.start_run() as run:
print("log model:")
mlflow.spark.log_model(
model,
f"{EXPERIMENT_NAME}-lrmodel",
registered_model_name=f"{EXPERIMENT_NAME}-lrmodel",
dfs_tmpdir="Files/spark",
)
print("log metrics:")
mlflow.log_metrics(log_metrics)
print("log parameters:")
mlflow.log_params(
{
"word2vec_size": word2vec_size,
"min_word_count": min_word_count,
"max_iter": max_iter,
"k_folds": k_folds,
"DATA_FILE": DATA_FILE,
}
)
model_uri = f"runs:/{run.info.run_id}/{EXPERIMENT_NAME}-lrmodel"
print("Model saved in run %s" % run.info.run_id)
print(f"Model URI: {model_uri}")
# load model back
loaded_model = mlflow.spark.load_model(model_uri, dfs_tmpdir="Files/spark")
# verify loaded model
predictions = loaded_model.transform(test_df)
display(predictions)