Clasificación: antes y después de SynapseML
Requisitos previos
- Adjunte el bloc de notas a una casa de lago. En el lado izquierdo, seleccione Agregar para agregar un lago existente o crear un lago.
Introducción
En este tutorial, realizamos la misma tarea de clasificación de dos maneras diferentes: una vez usando plain pyspark
y una vez mediante la synapseml
biblioteca. Los dos métodos producen el mismo rendimiento, pero una de las dos bibliotecas es drásticamente más sencilla de usar e iterar en (¿puede adivinar cuál?).
La tarea es sencilla: predecir si la revisión de un usuario de un libro vendido en Amazon es buena (calificación > 3) o incorrecta en función del texto de la revisión. Lo logramos entrenando aprendices de LogisticRegression con diferentes hiperparámetros y eligiendo el mejor modelo.
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Lectura de los datos
Descargamos y leemos en los datos.
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
Extracción de más características y procesamiento de datos
Sin embargo, los datos reales son más complejos que el conjunto de datos anterior. Es habitual que un conjunto de datos tenga características de varios tipos: texto, numérico y categórico. Para ilustrar lo difícil que es trabajar con estos conjuntos de datos, agregamos dos características numéricas al conjunto de datos: el recuento de palabras de la revisión y la longitud media de la palabra.
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def wordCount(s):
return len(s.split())
def wordLength(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer
wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
data.show(5)
Clasificación mediante pyspark
Para elegir el mejor clasificador LogisticRegression mediante la pyspark
biblioteca, debe realizar explícitamente los pasos siguientes:
- Procese las características:
- Tokenización de la columna de texto
- Aplicar un algoritmo hash a la columna tokenizada en un vector mediante hash
- Combinar las características numéricas con el vector
- Procesar la columna de etiqueta: convertirla en el tipo adecuado.
- Entrenamiento de varios algoritmos LogisticRegression en el
train
conjunto de datos con distintos hiperparámetros - Calcule el área bajo la curva ROC para cada uno de los modelos entrenados y seleccione el modelo con la métrica más alta calculada en el
test
conjunto de datos. - Evaluar el mejor modelo del
validation
conjunto
Como puede ver, hay un gran trabajo implicado y muchos pasos donde algo puede ir mal.
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))
Clasificación mediante synapseml
La vida es mucho más sencilla cuando se usa synapseml
!
El
TrainClassifier
estimador presenta los datos internamente, siempre que las columnas seleccionadas en eltrain
conjunto de datos ,test
,validation
representen las características.El
FindBestModel
estimador busca el mejor modelo de un grupo de modelos entrenados mediante la búsqueda del modelo que mejor funciona en eltest
conjunto de datos según la métrica especificada.El
ComputeModelStatistics
transformador calcula las distintas métricas en un conjunto de datos puntuado (en nuestro caso, elvalidation
conjunto de datos) al mismo tiempo.
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)