共用方式為


使用 SynapseML 的分類工作

本文說明如何使用兩種方法來執行特定分類工作。 一個方法使用純 pyspark,而一個方法使用 synapseml 庫。 儘管這些方法產生相同的效能,但它們強調了synapsemlpyspark相比的簡單性。

本文所述的工作是根據評論文字來預測亞馬遜上銷售的書籍的特定客戶評論是否良好(評等 ≥ > 3)或不佳。 要建立此任務,您可以使用不同的超參數來訓練邏輯迴歸學習者,然後選擇最佳模型。

必要條件

將您的筆記本連接到 Lakehouse。 在左側,您可以選取 [ 新增 ] 來新增現有的 Lakehouse,也可以建立新的 Lakehouse。

設定

匯入必要的 Python 函式庫,並建立 Spark 會話:

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

讀取資料

下載並讀取資料:

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

擷取特徵和處理資料

相較於我們稍早下載的數據集,實際數據更為複雜。 數據集通常具有多種類型的特徵,例如文字、數值和類別。 若要顯示使用這些數據集的困難,請將兩個數值特徵新增至數據集:檢閱的 字數平均字長度

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

使用 pyspark 分類

若要使用 pyspark 連結庫選擇最佳的羅吉斯回歸分類器,您必須 明確地 執行下列步驟:

  1. 處理功能
    • 標記化文字資料行
    • 將詞元化的資料行通過雜湊轉換為向量
    • 將數值特徵與向量合併
  2. 若要處理標籤數據列,請將該資料行轉換成適當的類型
  3. 使用不同的超參數,在train數據集上訓練多個邏輯斯迴歸演算法。
  4. 計算每個定型模型之 ROC 曲線下的區域,然後選取數據集上計算的最高計量的 test 模型
  5. 評估 validation 資料集的最佳模型
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

使用 SynapseML 分類

這個選項 synapseml 牽涉到更簡單的步驟:

  1. TrainClassifier估算器會在內部將數據特徵化,只要traintestvalidation資料集中選取的數據行代表特徵

  2. FindBestModel估算器會從定型模型集區中尋找最佳模型。 若要這樣做,它會在指定的計量下,尋找在數據集上執行最佳效能的 test 模型

  3. ComputeModelStatistics 轉換器會在評分的資料集上同時計算不同的計量 (在我們的案例中為 validation 資料集)

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)