Klassifizierungsaufgaben mit SynapseML

In diesem Artikel führen Sie dieselbe Klassifizierungsaufgabe auf zwei unterschiedliche Arten aus: einmal einfach mit pyspark und einmal mit der synapseml-Bibliothek. Die beiden Methoden erzielen die gleiche Leistung, jedoch wird die Einfachheit der Verwendung von synapseml im Vergleich zu pyspark hervorgehoben.

Die Aufgabe besteht darin, basierend auf dem Text einer Buchrezension vorherzusagen, ob die Bewertung dieses bei Amazon verkauften Buches gut (Bewertung > 3) oder schlecht ist. Sie erreichen dies, indem Sie LogisticRegression-Learner mit verschiedenen Hyperparametern trainieren und das beste Modell auswählen.

Voraussetzungen

Fügen Sie Ihr Notebook an ein Lakehouse an. Wählen Sie auf der linken Seite Hinzufügen aus, um ein vorhandenes Lakehouse hinzuzufügen oder ein Lakehouse zu erstellen.

Setup

Importieren Sie die erforderlichen Python-Bibliotheken, und rufen Sie eine Spark-Sitzung ab.

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Lesen der Daten

Laden Sie die Daten herunter, und lesen Sie sie ein.

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Extrahieren von Features und Verarbeiten der Daten

Echte Daten sind komplexer als das oben angegebene Dataset. Meist weist ein Dataset Features mit mehreren Typen auf, z. B. textlich, numerisch und kategorisch. Um zu veranschaulichen, wie schwierig die Arbeit mit diesen Datasets ist, fügen Sie dem Dataset zwei numerische Features hinzu: die Wortanzahl der Rezension und die mittlere Wortlänge.

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

Klassifizieren mithilfe von PySpark

Um den besten LogisticRegression-Klassifizierer mithilfe der pyspark-Bibliothek auszuwählen, müssen Sie die folgenden Schritte explizit ausführen:

  1. Verarbeiten der Features:
    • Tokenisieren der Textspalte
    • Bilden des Hashs der tokenisierten Spalte als Vektor mithilfe von Hashing
    • Zusammenführen der numerischen Features mit dem Vektor
  2. Verarbeiten der Bezeichnungsspalte: Umwandlung in den richtigen Typ
  3. Trainieren mehrerer LogisticRegression-Algorithmen für das train-Dataset mit verschiedenen Hyperparametern
  4. Berechnen der Fläche unter der ROC-Kurve für jedes trainierte Modell und Auswählen des Modells mit der höchsten Metrik, wie für das test-Dataset berechnet
  5. Bewerten des besten Modells mit dem validation-Satz
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

Klassifizieren mithilfe von SynapseML

Die erforderlichen Schritte mit synapseml sind einfacher:

  1. Der Schätzer TrainClassifier wandelt die Daten intern in Features um, solange die in den Datasets train, test, validation ausgewählten Spalten die Features darstellen.

  2. Der Schätzer FindBestModel sucht das beste Modell in einem Pool trainierter Modelle, indem das Modell ermittelt wird, das für das test-Dataset unter Berücksichtigung der angegebenen Metrik am besten funktioniert.

  3. Die Transformation ComputeModelStatistics berechnet zur gleichen Zeit die verschiedenen Metriken für ein bewertetes Dataset (in unserem Fall das validation-Dataset).

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)