Clasificación: antes y después de SynapseML

Requisitos previos

  • Adjunte el bloc de notas a una casa de lago. En el lado izquierdo, seleccione Agregar para agregar un lago existente o crear un lago.

Introducción

En este tutorial, realizamos la misma tarea de clasificación de dos maneras diferentes: una vez usando plain pyspark y una vez mediante la synapseml biblioteca. Los dos métodos producen el mismo rendimiento, pero una de las dos bibliotecas es drásticamente más sencilla de usar e iterar en (¿puede adivinar cuál?).

La tarea es sencilla: predecir si la revisión de un usuario de un libro vendido en Amazon es buena (calificación > 3) o incorrecta en función del texto de la revisión. Lo logramos entrenando aprendices de LogisticRegression con diferentes hiperparámetros y eligiendo el mejor modelo.

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Lectura de los datos

Descargamos y leemos en los datos.

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Extracción de más características y procesamiento de datos

Sin embargo, los datos reales son más complejos que el conjunto de datos anterior. Es habitual que un conjunto de datos tenga características de varios tipos: texto, numérico y categórico. Para ilustrar lo difícil que es trabajar con estos conjuntos de datos, agregamos dos características numéricas al conjunto de datos: el recuento de palabras de la revisión y la longitud media de la palabra.

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

Clasificación mediante pyspark

Para elegir el mejor clasificador LogisticRegression mediante la pyspark biblioteca, debe realizar explícitamente los pasos siguientes:

  1. Procese las características:
    • Tokenización de la columna de texto
    • Aplicar un algoritmo hash a la columna tokenizada en un vector mediante hash
    • Combinar las características numéricas con el vector
  2. Procesar la columna de etiqueta: convertirla en el tipo adecuado.
  3. Entrenamiento de varios algoritmos LogisticRegression en el train conjunto de datos con distintos hiperparámetros
  4. Calcule el área bajo la curva ROC para cada uno de los modelos entrenados y seleccione el modelo con la métrica más alta calculada en el test conjunto de datos.
  5. Evaluar el mejor modelo del validation conjunto

Como puede ver, hay un gran trabajo implicado y muchos pasos donde algo puede ir mal.

from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

Clasificación mediante synapseml

La vida es mucho más sencilla cuando se usa synapseml!

  1. El TrainClassifier estimador presenta los datos internamente, siempre que las columnas seleccionadas en el trainconjunto de datos , test, validation representen las características.

  2. El FindBestModel estimador busca el mejor modelo de un grupo de modelos entrenados mediante la búsqueda del modelo que mejor funciona en el test conjunto de datos según la métrica especificada.

  3. El ComputeModelStatistics transformador calcula las distintas métricas en un conjunto de datos puntuado (en nuestro caso, el validation conjunto de datos) al mismo tiempo.

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)

Pasos siguientes