SynapseML을 사용한 분류 작업

이 문서에서는 동일한 분류 작업을 두 가지 방법으로 수행합니다. 한 번은 일반 pyspark 을 사용하고 한 번은 라이브러리를 synapseml 사용하게 됩니다. 두 메서드는 동일한 성능을 제공하지만 사용 편의성을 synapseml 강조 표시합니다 pyspark.

이 작업은 고객이 Amazon에서 판매한 책에 대한 리뷰가 리뷰 텍스트에 따라 양호(등급 > 3)인지 또는 나쁜지 예측하는 것입니다. 다양한 하이퍼 매개 변수를 사용하여 LogisticRegression 학습자를 학습시키고 최상의 모델을 선택하여 이 작업을 수행합니다.

필수 조건

레이크하우스에 전자 필기장을 첨부합니다. 왼쪽에서 추가를 선택하여 기존 레이크하우스를 추가하거나 레이크하우스를 만듭니다.

설정

필요한 Python 라이브러리를 가져오고 Spark 세션을 가져옵니다.

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

데이터 읽기

데이터를 다운로드하고 읽습니다.

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

기능 추출 및 데이터 처리

실제 데이터는 위의 데이터 세트보다 더 복잡합니다. 데이터 세트에는 텍스트, 숫자 및 범주와 같은 여러 형식의 기능이 있는 것이 일반적입니다. 이러한 데이터 세트를 사용하는 것이 얼마나 어려운지 설명하려면 데이터 세트 에 검토 단어 수평균 단어 길이라는 두 가지 숫자 기능을 추가합니다.

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

pyspark를 사용하여 분류

라이브러리를 사용하여 pyspark 최상의 LogisticRegression 분류자를 선택하려면 다음 단계를 명시적으로 수행해야 합니다.

  1. 기능을 처리합니다.
    • 텍스트 열 토큰화
    • 해시를 사용하여 토큰화된 열을 벡터로 해시
    • 숫자 기능을 벡터와 병합
  2. 레이블 열을 처리합니다. 적절한 형식으로 캐스팅합니다.
  3. 서로 다른 하이퍼 매개 변수를 사용하여 train 데이터 세트에서 여러 LogisticRegression 알고리즘 학습
  4. 학습된 각 모델의 ROC 곡선 아래 영역을 계산하고 데이터 세트에서 계산된 test 메트릭이 가장 높은 모델을 선택합니다.
  5. 집합에서 최상의 모델 validation 평가
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

SynapseML을 사용하여 분류

필요한 synapseml 단계는 더 간단합니다.

  1. TrainClassifier 추정기에서 선택한 traintestvalidation 열이 기능을 나타내는 경우 데이터 세트는 내부적으로 데이터를 특징으로 합니다.

  2. 예측 도구는 FindBestModel 지정된 메트릭에 따라 데이터 세트에서 가장 잘 수행되는 모델을 찾아 학습된 모델 풀에서 최상의 test 모델을 찾습니다.

  3. ComputeModelStatistics 변환기는 점수가 매기된 데이터 세트(이 경우 validation 데이터 세트)에서 다른 메트릭을 동시에 계산합니다.

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)