Del via


Klassificeringsopgaver ved hjælp af SynapseML

I denne artikel kan du se, hvordan du udfører en bestemt klassificeringsopgave med to metoder. Én metode bruger almindelig pyspark, og én metode bruger biblioteket synapseml . Selvom metoderne giver den samme ydeevne, fremhæver de enkelheden af synapseml sammenlignet med pyspark.

Den opgave, der er beskrevet i denne artikel, forudsiger, om en bestemt kundeanmeldelse af en bog, der er solgt på Amazon, er god (bedømmelse > 3) eller ej, baseret på korrekturteksten. Hvis du vil bygge opgaven, skal du oplære LogisticRegression-elever med forskellige hyperparametre og derefter vælge den bedste model.

Forudsætninger

Vedhæft din notesbog til et lakehouse. I venstre side kan du vælge Tilføj for at tilføje et eksisterende lakehouse, eller du kan oprette et nyt lakehouse.

Opsætte

Importér de nødvendige Python-biblioteker, og få en Spark-session:

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Læs dataene

Download, og læs i dataene:

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Udtræk funktioner og behandl data

Reelle data har større kompleksitet sammenlignet med det datasæt, vi downloadede tidligere. Et datasæt har ofte funktioner af flere typer – f.eks. tekst, numerisk og kategorisk. Hvis du vil vise vanskelighederne med at arbejde med disse datasæt, skal du føje to numeriske funktioner til datasættet: antallet af ord i korrekturen og den gennemsnitlige ordlængde:

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

Klassificer ved hjælp af pyspark

Hvis du vil vælge den bedste LogisticRegression-klassificeringsklassificering ved hjælp af pyspark biblioteket, skal du eksplicit udføre disse trin:

  1. Behandl funktionerne
    • Tokenize tekstkolonnen
    • Hash den tokeniserede kolonne til en vektor ved hjælp af hashing
    • Flet de numeriske funktioner med vektoren
  2. Hvis du vil behandle navnekolonnen, skal du konvertere kolonnen til den korrekte type
  3. Oplær flere LogisticRegression-algoritmer i train datasættet med forskellige hyperparametre
  4. Beregn området under ROC-kurven for hver af de oplærte modeller, og vælg modellen med den højeste metrikværdi som beregnet på test datasættet
  5. Evaluer den bedste model på validation sættet
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

Klassificer ved hjælp af SynapseML

Indstillingen synapseml omfatter enklere trin:

  1. TrainClassifier Estimator viser internt dataene, så længe de kolonner, der er valgt i traindatasættet , testvalidation repræsenterer funktionerne

  2. FindBestModel Estimator finder den bedste model fra en pulje af oplærte modeller. For at gøre dette finder den model, der fungerer bedst på test datasættet, ud fra den angivne metrikværdi

  3. ComputeModelStatistics Transformeren beregner de forskellige målepunkter for et scoret datasæt (i vores tilfælde validation datasættet) på samme tid

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)