Sdílet prostřednictvím


Úlohy klasifikace pomocí SynapseML

Tento článek ukazuje, jak provést konkrétní úlohu klasifikace se dvěma metodami. Jedna metoda používá prostý pysparka jedna metoda používá knihovnu synapseml . I když metody přinášejí stejný výkon, zvýrazňují jednoduchost synapseml ve srovnání s pyspark.

Úloha popsaná v tomto článku předpovídá, jestli je na základě textu recenze recenze zákazníka na Amazonu dobrá (hodnocení > 3) nebo špatná. K vytvoření úlohy vytrénujete algoritmy Logistické regrese s různými hyperparametry a poté vyberete nejlepší model.

Požadavky

Připojte poznámkový blok k jezeru. Na levé straně můžete vybrat Přidat , pokud chcete přidat existující jezero, nebo můžete vytvořit nový jezerní dům.

Nastavení

Naimportujte potřebné knihovny Pythonu a získejte relaci Sparku:

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Přečtěte si data

Stáhněte si data a přečtěte si je:

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Extrakce funkcí a zpracování dat

Skutečná data jsou ve srovnání s datovou sadou, kterou jsme si stáhli dříve, složitější. Datová sada často obsahuje funkce více typů – například text, číselný a kategorický. Pokud chcete ukázat potíže při práci s těmito datovými sadami, přidejte do datové sady dvě číselné funkce: počet slov recenze a střední délku slova:

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

Klasifikace pomocí pysparku

Pokud chcete zvolit nejlepší klasifikátor LogisticRegression pomocí pyspark knihovny, musíte explicitně provést tyto kroky:

  1. Zpracování funkcí
    • Tokenizace textového sloupce
    • Hash tokenizovaného sloupce do vektoru pomocí hashování
    • Sloučení číselných prvků s vektorem
  2. Aby bylo možné sloupec štítku zpracovat, přetypujte tento sloupec na správný typ.
  3. Trénování několika algoritmů LogistickéRegrese v train datové sadě s různými hyperparametry
  4. Vypočítá oblast pod křivkou ROC pro každý natrénovaný model a vybere model s nejvyšší metrikou vypočítanou v test datové sadě.
  5. Vyhodnocení nejlepšího modelu v validation sadě
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

Klasifikace pomocí SynapseML

Tato synapseml možnost zahrnuje jednodušší kroky:

  1. TrainClassifier Estimátor interně featurizuje data, pokud sloupce vybrané v datové sadě train, test a validation představují funkce.

  2. FindBestModel Estimátor najde nejlepší model z fondu natrénovaných modelů. K tomu najde model, který nejlépe funguje s test datovou sadou zadanou metrikou.

  3. ComputeModelStatistics Transformer vypočítá různé metriky pro vyhodnocenou datovou sadu (v našem případě validation datovou sadu) najednou.

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)