Besorolási feladatok a SynapseML használatával

Ebben a cikkben ugyanazt a besorolási feladatot kétféleképpen hajtja végre: egyszer egyszerű pyspark és egyszer a synapseml tár használatával. A két módszer ugyanazt a teljesítményt nyújtja, de kiemeli a használat synapseml egyszerűségét.pyspark

A feladat annak előrejelzése, hogy az ügyfél által az Amazonon értékesített könyv véleménye jó (3. értékelés > ) vagy rossz-e a felülvizsgálat szövege alapján. Ezt a LogisticRegression-tanulók különböző hiperparaméterekkel való betanításával és a legjobb modell kiválasztásával érheti el.

Előfeltételek

Csatolja a jegyzetfüzetet egy tóházhoz. A bal oldalon válassza a Hozzáadás lehetőséget egy meglévő tóház hozzáadásához vagy egy tóház létrehozásához.

Beállítás

Importálja a szükséges Python-kódtárakat, és szerezzen be egy Spark-munkamenetet.

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Az adatok olvasása

Töltse le és olvassa be az adatokat.

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Funkciók kinyerés és adatok feldolgozása

A valós adatok összetettebbek a fenti adatkészletnél. Gyakran előfordul, hogy egy adathalmaznak több típusa van, például szöveg, numerikus és kategorikus. Annak szemléltetéséhez, hogy milyen nehéz ezekkel az adathalmazokkal dolgozni, adjon hozzá két numerikus funkciót az adathalmazhoz: a véleményezés szószámát és a szó középhosszát.

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

Besorolás pyspark használatával

A legjobb LogisticRegression-osztályozó kiválasztásához a pyspark kódtár használatával kifejezetten el kell végeznie a következő lépéseket:

  1. A funkciók feldolgozása:
    • A szövegoszlop jogkivonatossá alakítása
    • Kivonatolással kivonatolással kivonatos kivonatolással kivonatosíthatja a tokenizált oszlopot egy vektorban
    • A numerikus funkciók egyesítése a vektorral
  2. A címkeoszlop feldolgozása: a megfelelő típusba öntötte.
  3. Több LogisticRegression-algoritmus betanítása az train adathalmazon különböző hiperparaméterekkel
  4. Számítsa ki az ROC-görbe alatti területet minden betanított modell esetében, és válassza ki az adathalmazon test kiszámított legmagasabb metrikát tartalmazó modellt
  5. A legjobb modell kiértékelése a validation készleten
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

Besorolás a SynapseML használatával

A szükséges synapseml lépések egyszerűbbek:

  1. A TrainClassifier Becslő belsőleg featurálja az adatokat, amíg a , , testvalidation adathalmazban trainkijelölt oszlopok a jellemzőket jelölik

  2. A FindBestModel Becslő a betanított modellek készletéből találja meg a legjobb modellt, ha megkeresi azt a modellt, amely a megadott metrika alapján a legjobban teljesít az test adathalmazon

  3. A ComputeModelStatistics Transzformátor egyszerre számítja ki a különböző metrikákat egy pontozott adathalmazon (esetünkben az validation adathalmazon).

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)