SynapseML kullanarak sınıflandırma görevleri

Bu makalede, aynı sınıflandırma görevini iki farklı şekilde gerçekleştirebilirsiniz: bir kez düz pyspark ve bir kez kitaplığı kullanarak synapseml . İki yöntem aynı performansı verir, ancak ile karşılaştırıldığında pysparkkullanmanın synapseml basitliğini vurgular.

Buradaki görev, müşterinin Amazon'da satılan bir kitabı gözden geçirmesinin iyi mi (derecelendirme > 3) yoksa inceleme metnine göre kötü mü olduğunu tahmin etmektir. Bunu, logisticRegression öğrencilerini farklı hiper parametrelerle eğiterek ve en iyi modeli seçerek gerçekleştirebilirsiniz.

Önkoşullar

Not defterinizi bir göle ekleyin. Sol tarafta Ekle'yi seçerek mevcut bir göl evi ekleyin veya bir göl evi oluşturun.

Ayarlama

Gerekli Python kitaplıklarını içeri aktarıp spark oturumu alın.

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Verileri okuma

Verileri indirin ve okuyun.

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Özellikleri ayıklama ve verileri işleme

Gerçek veriler yukarıdaki veri kümesinden daha karmaşıktır. Bir veri kümesinin metin, sayısal ve kategorik gibi birden çok türde özelliği olması yaygındır. Bu veri kümeleriyle çalışmanın ne kadar zor olduğunu göstermek için veri kümesine iki sayısal özellik ekleyin: incelemenin sözcük sayısı ve ortalama sözcük uzunluğu.

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

pyspark kullanarak sınıflandırma

Kitaplığı kullanarak en iyi LogisticRegression sınıflandırıcısını pyspark seçmek için aşağıdaki adımları açıkça gerçekleştirmeniz gerekir:

  1. Özellikleri işleme:
    • Metin sütununu belirteç olarak belirleme
    • Karma oluşturma kullanarak belirteçli sütunu bir vektörde karma hale getir
    • Sayısal özellikleri vektörle birleştirme
  2. Etiket sütununu işleyin: doğru türe yayınlayın.
  3. Farklı hiper parametrelerle veri kümesinde train birden çok LogisticRegression algoritması eğitme
  4. Eğitilen modellerin her biri için ROC eğrisinin altındaki alanı hesaplayıp veri kümesinde test hesaplanan en yüksek ölçüme sahip modeli seçin
  5. Kümedeki en iyi modeli validation değerlendirme
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

SynapseML kullanarak sınıflandırma

ile synapseml gereken adımlar daha basittir:

  1. TrainClassifier Tahmin Aracı, , testvalidation veri kümesinde trainseçilen sütunlar özellikleri gösterdiği sürece verileri dahili olarak öne çıkartır

  2. Tahmin Aracı, FindBestModel belirtilen ölçüme göre veri kümesinde en iyi performansı gösteren modeli bularak eğitilmiş modeller havuzundan test en iyi modeli bulur

  3. ComputeModelStatistics Transformer, puanlanmış bir veri kümesindeki (bizim durumumuzda validation veri kümesi) farklı ölçümleri aynı anda hesaplar

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)