Klassificeringsuppgifter med SynapseML

I den här artikeln utför du samma klassificeringsuppgift på två olika sätt: en gång med oformaterad pyspark och en gång med hjälp av synapseml biblioteket. De två metoderna ger samma prestanda, men belyser enkelheten i att pysparkanvända jämfört med synapseml .

Uppgiften är att förutsäga om en kunds recension av en bok som säljs på Amazon är bra (betyg > 3) eller dålig baserat på texten i recensionen. Du gör det genom att träna LogisticRegression-elever med olika hyperparametrar och välja den bästa modellen.

Förutsättningar

Bifoga anteckningsboken till ett sjöhus. Till vänster väljer du Lägg till för att lägga till ett befintligt sjöhus eller skapa ett sjöhus.

Ställ in

Importera nödvändiga Python-bibliotek och få en Spark-session.

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Läsa data

Ladda ned och läsa i data.

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Extrahera funktioner och bearbeta data

Verkliga data är mer komplexa än datauppsättningen ovan. Det är vanligt att en datauppsättning har funktioner av flera typer, till exempel text, numeriska och kategoriska. För att illustrera hur svårt det är att arbeta med dessa datauppsättningar lägger du till två numeriska funktioner i datamängden: antalet ord i recensionen och medelvärdet av ordlängden.

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

Klassificera med pyspark

Om du vill välja den bästa LogisticRegression-klassificeraren med hjälp av pyspark biblioteket måste du uttryckligen utföra följande steg:

  1. Bearbeta funktionerna:
    • Tokenisera textkolumnen
    • Hash den tokeniserade kolumnen till en vektor med hashing
    • Sammanfoga de numeriska funktionerna med vektorn
  2. Bearbeta etikettkolumnen: omvandla den till rätt typ.
  3. Träna flera LogisticRegression-algoritmer på datauppsättningen train med olika hyperparametrar
  4. Beräkna området under ROC-kurvan för var och en av de tränade modellerna och välj modellen med det högsta måttet som beräknats på datauppsättningen test
  5. Utvärdera den bästa modellen på validation uppsättningen
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

Klassificera med SynapseML

De steg som behövs med synapseml är enklare:

  1. TrainClassifier Estimatorn utför funktionaliseringar av data internt, så länge kolumnerna som valts i datauppsättningen train, testrepresenterar validation funktionerna

  2. FindBestModel Estimatorn hittar den bästa modellen från en pool med tränade modeller genom att hitta den modell som presterar bäst på datamängden test givet det angivna måttet

  3. ComputeModelStatistics Transformeraren beräknar de olika måtten på en poängsatt datamängd (i vårt fall datauppsättningenvalidation) samtidigt

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)