Compartir vía


Tareas de clasificación con SynapseML

En este artículo, realizará la misma tarea de clasificación de dos maneras diferentes: una vez con pyspark simple y una vez con la biblioteca synapseml. Los dos métodos producen el mismo rendimiento, pero destaca la simplicidad de usar synapseml en comparación con pyspark.

La tarea consiste en predecir si la reseña de un cliente sobre un libro vendido en Amazon es buena (calificación > 3) o mala basándose en el texto de la reseña. Para ello, puede entrenar aprendices de LogisticRegression con distintos hiperparámetros y elegir el mejor modelo.

Requisitos previos

Adjunte el bloc de notas a una casa de lago. En el lado izquierdo, seleccione Añadir para añadir un almacén de lago existente o crear uno.

Configuración

Importar las bibliotecas de Python necesarias y obtener una sesión de Spark.

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Lectura de los datos

Descargar y leer datos.

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Extraer características y procesar datos

Los datos reales son más complejos que el conjunto de datos anterior. Es habitual que un conjunto de datos contenga características de varios tipos, como texto, numéricas y categóricas. Para ilustrar lo difícil que es trabajar con estos conjuntos de datos, agregue dos características numéricas al conjunto de datos: el número de palabras de la reseña y la longitud media de las palabras.

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

Clasificación mediante pyspark

Para elegir el mejor clasificador LogisticRegression mediante la biblioteca pyspark, debe realizar explícitamente los pasos siguientes:

  1. Procese las características:
    • Tokenizar la columna de texto
    • Aplicar hash a la columna tokenizada en un vector mediante hash
    • Combinar las características numéricas con el vector
  2. Procesar la columna de etiqueta: convertirla en el tipo adecuado.
  3. Entrenar varios algoritmos LogisticRegression en el conjunto de datos train con distintos hiperparámetros
  4. Calcule el área bajo la curva ROC para cada uno de los modelos entrenados y seleccione el modelo con la métrica más alta calculada en el conjunto de datos test
  5. Evaluar el mejor modelo en el conjunto validation
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

Clasificación mediante SynapseML

Los pasos necesarios con synapseml son más sencillos:

  1. El estimador TrainClassifier presenta los datos internamente, siempre y cuando las columnas seleccionadas en el conjunto de datos train, test, validation representen las características

  2. El estimador FindBestModel encuentra el mejor modelo de entre un grupo de modelos entrenados mediante la búsqueda del modelo que funciona mejor en el conjunto de datos test según la métrica especificada

  3. El transformador ComputeModelStatistics calcula las distintas métricas en un conjunto de datos puntuado (en nuestro caso, el conjunto de datos validation) al mismo tiempo

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)