Tâches de classification avec SynapseML

Dans cet article, vous effectuez la même tâche de classification de deux manières différentes : une fois en utilisant plain pyspark et une fois en utilisant la bibliothèque synapseml. Les deux méthodes donnent les mêmes performances, mais mettent en évidence la simplicité d'utilisation synapseml par rapport à pyspark.

La tâche consiste à prédire si l'avis d'un client sur un livre vendu sur Amazon est bon (note > 3) ou mauvais en fonction du texte de l'avis. Vous y parvenez en formant les apprenants LogisticRegression avec différents hyperparamètres et en choisissant le meilleur modèle.

Prérequis

Attachez votre cahier à une cabane au bord du lac. Sur le côté gauche, sélectionnez Ajouter pour ajouter une maison de lac existante ou créer une maison de lac.

Programme d’installation

Importez les bibliothèques Python nécessaires et obtenez une session Spark.

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Lire les données

Téléchargez et lisez les données.

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Extraire des fonctionnalités et traiter des données

Les données réelles sont plus complexes que l'ensemble de données ci-dessus. Il est courant qu'un jeu de données ait des fonctionnalités de plusieurs types, telles que textuelles, numériques et catégorielles. Pour illustrer à quel point il est difficile de travailler avec ces ensembles de données, ajoutez deux caractéristiques numériques à l'ensemble de données : le nombre de mots de l'avis et la longueur moyenne des mots.

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

Classifier à l'aide de pyspark

Pour choisir le meilleur classificateur LogisticRegression à l'aide de la bibliothèque pyspark, vous devez explicitement effectuer les étapes suivantes :

  1. Traiter les fonctionnalités :
    • Tokeniser la colonne de texte
    • Hachez la colonne tokenisée dans un vecteur en utilisant le hachage
    • Fusionner les caractéristiques numériques avec le vecteur
  2. Traitez la colonne d'étiquette : convertissez-la dans le type approprié.
  3. Former plusieurs algorithmes LogisticRegression sur l'ensemble de données train avec différents hyperparamètres
  4. Calculez l'aire sous la courbe ROC pour chacun des modèles entraînés et sélectionnez le modèle avec la métrique la plus élevée calculée sur l'ensemble de données test
  5. Évaluez le meilleur modèle sur le plateau validation
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

Classifier à l'aide de SynapseML

Les étapes nécessaires avec synapseml sont plus simples :

  1. L'estimateur TrainClassifier caractérise les données en interne, tant que les colonnes sélectionnées dans l'ensemble de données train, test, validation représentent les caractéristiques

  2. L'estimateur FindBestModel trouve le meilleur modèle à partir d'un pool de modèles formés en trouvant le modèle qui fonctionne le mieux sur l'ensemble de données test tenu de l’indicateur de performance spécifié

  3. Le transformateur ComputeModelStatistics calcule les différentes métriques sur un jeu de données noté (dans notre cas, le jeu de données validation) en même temps

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)