使用 SynapseML 的分類工作
在本文中,您會以兩種不同的方式執行相同的分類工作:一次使用純 pyspark
,一次使用 synapseml
程式庫。 這兩種方法會產生相同的效能,但突出強調了相較於 pyspark
,使用 synapseml
的簡單性。
這項工作是預測客戶對 Amazon 上銷售的書籍的評論是否良好 (評等 > 3) 或根據評論的文字為不佳。 您可以使用不同的超參數來訓練羅吉斯迴歸學習者,然後選擇最佳模型來完成此作業。
必要條件
將筆記本連結至 Lakehouse。 在左側,選取 [新增],以新增現有的 Lakehouse 或建立 Lakehouse。
設定
匯入必要的 Python 程式庫並取得 Spark 工作階段。
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
讀取資料
下載並讀取資料。
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
擷取特徵和處理資料
真實資料比上述資料集複雜。 資料集通常具有多種類型的功能,例如文字、數值和類別。 為了說明使用這些資料集有多困難,請將兩個數值特徵新增至資料集:檢閱的字數和平均字長度。
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def wordCount(s):
return len(s.split())
def wordLength(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer
wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
data.show(5)
使用 pyspark 分類
若要使用 pyspark
程式庫選擇最佳的羅吉斯迴歸分類器,您需要明確地執行下列步驟:
- 處理特徵:
- 標記化文字資料行
- 使用雜湊將 Token 化資料行雜湊為向量
- 將數值特徵與向量合併
- 處理標籤資料行:將期轉換成適當的類型。
- 使用不同超參數在
train
資料集上訓練多個羅吉斯迴歸演算法 - 計算每個經過訓練的模型的 ROC 曲線下的區域,並選取在資料集上計算的最高計量的
test
模型 - 評估
validation
集上的最佳模型
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))
使用 SynapseML 分類
所需的 synapseml
步驟更簡單:
TrainClassifier
估算器會在內部將資料特徵化,只要train
、test
、validation
資料集中選取的資料行代表特徵FindBestModel
估算器會從經過訓練的模型的集區中尋找最佳模型,方法是尋找在特定計量指定的test
資料集上執行效能最佳的模型ComputeModelStatistics
轉換器會在評分的資料集上同時計算不同的計量 (在我們的案例中為validation
資料集)
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)