Jaa


Luokitustehtävät SynapseML:n avulla

Tässä artikkelissa suoritat saman luokitustehtävän kahdella eri tavalla: käyttäessäsi vain- pyspark ja kerran -kirjastoa synapseml . Nämä kaksi menetelmää tuottavat saman suorituskyvyn, mutta korostavat -menetelmän yksinkertaisuuden synapseml verrattuna kohteeseen pyspark.

Tehtävänä on ennustaa, onko asiakkaan arvio Amazonissa myydystä kirjasta hyvä (luokitus > 3) vai huono arvostelun tekstin perusteella. Voit toteuttaa tämän kouluttamalla LogisticRegression-oppijat, joilla on eri hyperparametrit, ja valitsemalla parhaan mallin.

Edellytykset

Liitä muistikirjasi Lakehouseen. Valitse vasemmalla puolella Lisää lisätäksesi aiemmin luodun lakehousen tai luodaksesi lakehousen.

Asetusten määrittäminen

Tuo tarvittavat Python-kirjastot ja saat spark-istunnon.

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

Tietojen lukeminen

Lataa ja lue tiedot.

rawData = spark.read.parquet(
    "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)

Poimi ominaisuuksia ja prosessitietoja

Reaalitiedot ovat monimutkaisempia kuin yllä oleva tietojoukko. On tavallista, että tietojoukossa on useita erityyppisiä ominaisuuksia, kuten teksti, numeerinen tai luokittainen. Voit havainnollistaa, kuinka vaikeaa näiden tietojoukkojen kanssa on toimia, lisäämällä tietojoukkoon kaksi numeerista ominaisuutta: tarkistuksen sanamäärä ja sanan keskimääräinen pituus.

from pyspark.sql.functions import udf
from pyspark.sql.types import *


def wordCount(s):
    return len(s.split())


def wordLength(s):
    import numpy as np

    ss = [len(w) for w in s.split()]
    return round(float(np.mean(ss)), 2)


wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer

wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
    inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
    inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline

data = (
    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
    .fit(rawData)
    .transform(rawData)
    .withColumn("label", rawData["rating"] > 3)
    .drop("rating")
)
data.show(5)

Luokittele pysparkin avulla

Jos haluat valita kirjastoa käyttävän parhaan LogisticRegression-luokituksen pyspark , sinun on suoritettava eksplisiittisesti seuraavat vaiheet:

  1. Käsittele ominaisuudet:
    • Tunnus tekstisarakkeen tunnuksiin
    • Hajautustoiminnon avulla tunnustettu sarake vektoriksi
    • Numeeristen ominaisuuksien yhdistäminen vektoriin
  2. Käsittele otsikkosarake: heitä se oikeaan tyyppiin.
  3. Harjoita useita tietojoukon train LogisticRegression-algoritmeja eri hyperparametereilla
  4. Laske roc-käyrän alla oleva alue kullekin harjoitetulle mallille ja valitse malli, jolla on suurin mittari tietojoukkoon laskettuna test .
  5. Arvioi tietojoukon paras malli validation
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler

# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
    inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)

# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)

# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
    "label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []

# Select the best model
for learner in logisticRegressions:
    model = learner.fit(train)
    models.append(model)
    scoredData = model.transform(test)
    metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]

# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))

Luokittele SynapseML:n avulla

Tarvittavat vaiheet synapseml ovat yksinkertaisempia:

  1. TrainClassifier Estimator-ominaisuus sisältää tiedot sisäisesti, kunhan tietojoukossa traintestvalidation valitut sarakkeet edustavat ominaisuuksia

  2. FindBestModel Estimator löytää parhaan mallin harjoitetuista malleista etsimällä mallin, joka suoriutuu parhaiten tietojoukosta test määritetyn arvon mukaisesti

  3. ComputeModelStatistics Muuntaja laskee pisteytetun tietojoukon eri mittarit (tässä tapauksessa validation tietojoukon) samalla kertaa

from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel

# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)

# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
    TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
    for lrm in logisticRegressions
]

# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)


# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
    "Best model's AUC on validation set = "
    + "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)