Besorolási feladatok a SynapseML használatával
Ebben a cikkben ugyanazt a besorolási feladatot kétféleképpen hajtja végre: egyszer egyszerű pyspark
és egyszer a synapseml
tár használatával. A két módszer ugyanazt a teljesítményt nyújtja, de kiemeli a használat synapseml
egyszerűségét.pyspark
A feladat annak előrejelzése, hogy az ügyfél által az Amazonon értékesített könyv véleménye jó (3. értékelés > ) vagy rossz-e a felülvizsgálat szövege alapján. Ezt a LogisticRegression-tanulók különböző hiperparaméterekkel való betanításával és a legjobb modell kiválasztásával érheti el.
Előfeltételek
Csatolja a jegyzetfüzetet egy tóházhoz. A bal oldalon válassza a Hozzáadás lehetőséget egy meglévő tóház hozzáadásához vagy egy tóház létrehozásához.
Beállítás
Importálja a szükséges Python-kódtárakat, és szerezzen be egy Spark-munkamenetet.
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Az adatok olvasása
Töltse le és olvassa be az adatokat.
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
Funkciók kinyerés és adatok feldolgozása
A valós adatok összetettebbek a fenti adatkészletnél. Gyakran előfordul, hogy egy adathalmaznak több típusa van, például szöveg, numerikus és kategorikus. Annak szemléltetéséhez, hogy milyen nehéz ezekkel az adathalmazokkal dolgozni, adjon hozzá két numerikus funkciót az adathalmazhoz: a véleményezés szószámát és a szó középhosszát.
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def wordCount(s):
return len(s.split())
def wordLength(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer
wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
data.show(5)
Besorolás pyspark használatával
A legjobb LogisticRegression-osztályozó kiválasztásához a pyspark
kódtár használatával kifejezetten el kell végeznie a következő lépéseket:
- A funkciók feldolgozása:
- A szövegoszlop jogkivonatossá alakítása
- Kivonatolással kivonatolással kivonatos kivonatolással kivonatosíthatja a tokenizált oszlopot egy vektorban
- A numerikus funkciók egyesítése a vektorral
- A címkeoszlop feldolgozása: a megfelelő típusba öntötte.
- Több LogisticRegression-algoritmus betanítása az
train
adathalmazon különböző hiperparaméterekkel - Számítsa ki az ROC-görbe alatti területet minden betanított modell esetében, és válassza ki az adathalmazon
test
kiszámított legmagasabb metrikát tartalmazó modellt - A legjobb modell kiértékelése a
validation
készleten
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))
Besorolás a SynapseML használatával
A szükséges synapseml
lépések egyszerűbbek:
A
TrainClassifier
Becslő belsőleg featurálja az adatokat, amíg a , ,test
validation
adathalmazbantrain
kijelölt oszlopok a jellemzőket jelölikA
FindBestModel
Becslő a betanított modellek készletéből találja meg a legjobb modellt, ha megkeresi azt a modellt, amely a megadott metrika alapján a legjobban teljesít aztest
adathalmazonA
ComputeModelStatistics
Transzformátor egyszerre számítja ki a különböző metrikákat egy pontozott adathalmazon (esetünkben azvalidation
adathalmazon).
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)