Klasifikace – před a po SynapseML

Požadavky

  • Připojte poznámkový blok k Lakehouse. Na levé straně vyberte Přidat a přidejte existující lakehouse nebo vytvořte lakehouse.

Úvod

V tomto kurzu provedeme stejnou úlohu klasifikace dvěma různými způsoby: jednou pomocí prostého pysparksynapseml formátu a jednou pomocí knihovny. Tyto dvě metody poskytují stejný výkon, ale jednu ze dvou knihoven je výrazně jednodušší použít a iterovat na (můžete odhadnout kterou z nich?).

Úkol je jednoduchý: Na základě textu recenze předpovědět, jestli je recenze knihy prodané na Amazonu dobrá (hodnocení > 3) nebo špatná. Toho dosáhneme trénováním učujících se LogisticRegression s různými hyperparametry a výběrem nejlepšího modelu.

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Čtení dat

Data stáhneme a přečteme.

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Extrahování dalších funkcí a zpracování dat

Skutečná data jsou ale složitější než výše uvedená datová sada. Je běžné, že datová sada má funkce několika typů: text, číselný nebo kategorický. Abychom si ukázali, jak je práce s těmito datovými sadami obtížná, přidáme do datové sady dvě číselné znaky: počet slov v recenzi a střední délku slova.

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

Klasifikace pomocí pysparku

Pokud chcete pomocí pyspark knihovny zvolit nejlepší klasifikátor LogisticRegression, musíte explicitně provést následující kroky:

  1. Zpracování funkcí:
    • Tokenizace textového sloupce
    • Hash tokenizovaného sloupce do vektoru pomocí hash
    • Sloučení číselných prvků s vektorem
  2. Zpracujte sloupec popisku: přetypujte ho na správný typ.
  3. Trénování více algoritmů LogisticRegression v train datové sadě s různými hyperparametry
  4. Výpočet oblasti pod křivkou ROC pro každý z natrénovaných modelů a výběr modelu s nejvyšší metrikou vypočítanou v test datové sadě
  5. Vyhodnocení nejlepšího modelu na validation sadě

Jak vidíte, je tu spousta práce a mnoho kroků, kdy se může něco pokazit.

from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

Klasifikace pomocí synapsemlu

Život je mnohem jednodušší při použití synapseml!

  1. Nástroj TrainClassifier Estimator interně zhodnotí data, pokud sloupce vybrané v traindatové sadě , testvalidation představují funkce.

  2. Estimátor FindBestModel najde nejlepší model z fondu natrénovaných modelů vyhledáním modelu, který nejlépe funguje u test datové sady s danou zadanou metrikou.

  3. Transformer ComputeModelStatistics vypočítá různé metriky pro datovou sadu se skóre (v našem případě validation datovou sadu) ve stejnou dobu.

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)

Další kroky