次の方法で共有


SynapseML を使用した分類タスク

この記事では、2 つの方法で特定の分類タスクを実行する方法について説明します。 1 つのメソッドではプレーン pysparkを使用し、1 つのメソッドでは synapseml ライブラリを使用します。 メソッドのパフォーマンスは同じですが、pysparkと比較してsynapsemlのシンプルさが強調されています。

この記事で説明するタスクでは、レビューテキストに基づいて、Amazon で販売された書籍の特定の顧客レビューが良い (評価 > 3) か悪いかを予測します。 タスクを構築するには、さまざまなハイパーパラメーターを使用して LogisticRegression 学習者をトレーニングし、最適なモデルを選択します。

[前提条件]

ノートブックをレイクハウスにアタッチします。 左側で、[ 追加 ] を選択して既存のレイクハウスを追加するか、新しいレイクハウスを作成できます。

セットアップ

必要な Python ライブラリをインポートし、Spark セッションを取得します。

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

データを読み取る

データをダウンロードして読み取る:

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

特徴の抽出とデータの処理

実際のデータは、前にダウンロードしたデータセットと比較して複雑になります。 データセットには、多くの場合、テキスト、数値、カテゴリなど、複数の型の特徴があります。 これらのデータセットの操作の難しさを示すには、レビューの 単語数平均単語の長さという 2 つの数値特徴をデータセットに追加します。

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

pyspark を使用して分類する

pyspark ライブラリを使用して最適な LogisticRegression 分類子を選択するには、次の手順を明示的に実行する必要があります。

  1. 機能を処理する
    • テキスト列をトークン化する
    • ハッシュを使用してトークン化された列をベクターにハッシュする
    • 数値特徴をベクトルとマージする
  2. ラベル列を処理するには、その列を適切な型にキャストします
  3. 異なるハイパーパラメーターを使用して、 train データセットで複数の LogisticRegression アルゴリズムをトレーニングする
  4. トレーニング済みの各モデルの ROC 曲線の下の領域を計算し、 test データセットで計算されたメトリックが最も高いモデルを選択します
  5. validation セットで最適なモデルを評価する
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

SynapseML を使用した分類

synapseml オプションには、より簡単な手順が含まれます。

  1. TrainClassifier Estimator は、traintestvalidation データセットで選択された列が特徴を表す限り、データを内部的に特徴付けします

  2. FindBestModel Estimator は、トレーニング済みのモデルのプールから最適なモデルを検索します。 これを行うには、指定されたメトリックに基づいて、 test データセットに対して最適なパフォーマンスを発揮するモデルを検索します

  3. ComputeModelStatistics Transformer は、スコア付けされたデータセット (ここでは、validation データセット) に対するさまざまなメトリックを同時に計算します

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)