Klassifiseringsoppgaver ved hjelp av SynapseML

I denne artikkelen utfører du den samme klassifiseringsoppgaven på to forskjellige måter: når du bruker vanlig pyspark og én synapseml gang ved hjelp av biblioteket. De to metodene gir samme ytelse, men fremhever enkelheten ved å pysparkbruke synapseml sammenlignet med .

Oppgaven er å forutsi om en kundes gjennomgang av en bok solgt på Amazon er god (rating > 3) eller dårlig basert på teksten i anmeldelsen. Du oppnår det ved å lære logisticregression-elever med forskjellige hyperparametere og velge den beste modellen.

Forutsetning

Legg notatblokken til et lakehouse. På venstre side velger du Legg til for å legge til et eksisterende innsjøhus eller opprette et innsjøhus.

Oppsett

Importer nødvendige Python-biblioteker og få en spark-økt.

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Les dataene

Last ned og les i dataene.

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Trekke ut funksjoner og behandle data

Reelle data er mer komplekse enn datasettet ovenfor. Det er vanlig at et datasett har funksjoner av flere typer, for eksempel tekst, numerisk og kategorisk. Hvis du vil illustrere hvor vanskelig det er å arbeide med disse datasettene, kan du legge til to numeriske funksjoner i datasettet: ordtellingen for gjennomgangen og gjennomsnittlig ordlengde.

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

Klassifisere ved hjelp av pyspark

Hvis du vil velge den beste LogisticRegression-klassifikatoren pyspark ved hjelp av biblioteket, må du eksplisitt utføre følgende trinn:

  1. Behandle funksjonene:
    • Tokenize tekstkolonnen
    • Hash den tokeniserte kolonnen til en vektor ved hjelp av hashing
    • Slå sammen de numeriske funksjonene med vektoren
  2. Behandle etikettkolonnen: Kast den inn i riktig type.
  3. Lære opp flere LogisticRegression-algoritmer på train datasettet med ulike hyperparametere
  4. Beregne området under ROC-kurven for hver av de opplærte modellene, og velg modellen med høyest metrikkverdi som beregnet på test datasettet
  5. Evaluer den beste modellen på validation settet
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

Klassifisere ved hjelp av SynapseML

Trinnene som kreves med synapseml , er enklere:

  1. Estimatoren TrainClassifier inneholder dataene internt, så lenge kolonnene som er valgt i traindatasettet , testvalidation representerer funksjonene

  2. FindBestModel Estimator finner den beste modellen fra et utvalg av opplærte modeller ved å finne modellen som fungerer best på test datasettet, gitt den angitte metrikkverdien

  3. Transformatoren ComputeModelStatistics beregner de ulike måledataene på et datasett (i vårt tilfelle validation datasettet) samtidig

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)