Sınıflandırma - SynapseML Öncesi ve Sonrası

Ön koşullar

  • Not defterinizi bir göl evine ekleyin. Sol tarafta Ekle'yi seçerek mevcut bir göl evi ekleyin veya bir göl evi oluşturun.

Giriş

Bu öğreticide, aynı sınıflandırma görevini iki farklı şekilde gerçekleştireceğiz: düz pyspark ve kitaplığı kullandıktan synapseml sonra. İki yöntem aynı performansı verir, ancak iki kitaplıklardan birinin kullanılması ve yinelenip yeniden kullanılması çok daha kolaydır (hangisi olduğunu tahmin edebilir misiniz?).

Görev basittir: Kullanıcının Amazon'da satılan bir kitabı gözden geçirmesinin iyi mi (derecelendirme > 3) yoksa inceleme metnine göre kötü mü olduğunu tahmin edin. Bunu, farklı hiper parametrelere sahip LogisticRegression öğrencilerini eğiterek ve en iyi modeli seçerek başarıyoruz.

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Verileri okuma

Verileri indirip okuyoruz.

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Daha fazla özellik ayıklama ve veri işleme

Ancak gerçek veriler yukarıdaki veri kümesinden daha karmaşıktır. Bir veri kümesinin birden çok türde özelliği olması yaygındır: metin, sayısal, kategorik. Bu veri kümeleriyle çalışmanın ne kadar zor olduğunu göstermek için veri kümesine iki sayısal özellik ekleriz: incelemenin sözcük sayısı ve ortalama sözcük uzunluğu.

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

pyspark kullanarak sınıflandırma

Kitaplığı kullanarak en iyi LogisticRegression sınıflandırıcısını pyspark seçmek için aşağıdaki adımları açıkça gerçekleştirmeniz gerekir:

  1. Özellikleri işleme:
    • Metin sütununu belirteç olarak belirleme
    • Karma kullanarak belirteçli sütunu bir vektörde karma hale getir
    • Sayısal özellikleri vektörle birleştirme
  2. Etiket sütununu işleme: uygun türe dönüştür.
  3. Farklı hiper parametrelerle veri kümesinde train birden çok LogisticRegression algoritması eğitme
  4. Eğitilen modellerin her biri için ROC eğrisinin altındaki alanı hesaplayıp veri kümesinde test hesaplanan en yüksek ölçüme sahip modeli seçin
  5. Kümedeki en iyi modeli validation değerlendirme

Gördüğünüz gibi, çok sayıda iş ve bir şeylerin ters gidebileceği birçok adım vardır!

from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

Synapseml kullanarak sınıflandırma

Kullanımda synapsemlhayat çok daha basittir!

  1. TrainClassifier Tahmin Aracı, , testvalidation veri kümesinde trainseçilen sütunlar özellikleri temsil edene kadar verileri dahili olarak sunar

  2. Tahmin Aracı, FindBestModel belirtilen ölçüme göre veri kümesinde en iyi performansı gösteren modeli bularak eğitilmiş modeller havuzundan test en iyi modeli bulur

  3. ComputeModelStatistics Transformer, puanlanmış bir veri kümesindeki (bizim örneğimizde validation veri kümesi) farklı ölçümleri aynı anda hesaplar

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)

Sonraki adımlar