Bemærk
Adgang til denne side kræver godkendelse. Du kan prøve at logge på eller ændre mapper.
Adgang til denne side kræver godkendelse. Du kan prøve at ændre mapper.
I denne artikel kan du se, hvordan du udfører en bestemt klassificeringsopgave med to metoder. Én metode bruger almindelig pyspark, og én metode bruger biblioteket synapseml . Selvom metoderne giver den samme ydeevne, fremhæver de enkelheden af synapseml sammenlignet med pyspark.
Den opgave, der er beskrevet i denne artikel, forudsiger, om en bestemt kundeanmeldelse af en bog, der er solgt på Amazon, er god (bedømmelse > 3) eller ej, baseret på korrekturteksten. Hvis du vil bygge opgaven, skal du oplære LogisticRegression-elever med forskellige hyperparametre og derefter vælge den bedste model.
Forudsætninger
Vedhæft din notesbog til et lakehouse. I venstre side kan du vælge Tilføj for at tilføje et eksisterende lakehouse, eller du kan oprette et nyt lakehouse.
Opsætte
Importér de nødvendige Python-biblioteker, og få en Spark-session:
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Læs dataene
Download, og læs i dataene:
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
Udtræk funktioner og behandl data
Reelle data har større kompleksitet sammenlignet med det datasæt, vi downloadede tidligere. Et datasæt har ofte funktioner af flere typer – f.eks. tekst, numerisk og kategorisk. Hvis du vil vise vanskelighederne med at arbejde med disse datasæt, skal du føje to numeriske funktioner til datasættet: antallet af ord i korrekturen og den gennemsnitlige ordlængde:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def wordCount(s):
return len(s.split())
def wordLength(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer
wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
data.show(5)
Klassificer ved hjælp af pyspark
Hvis du vil vælge den bedste LogisticRegression-klassificeringsklassificering ved hjælp af pyspark biblioteket, skal du eksplicit udføre disse trin:
- Behandl funktionerne
- Tokenize tekstkolonnen
- Hash den tokeniserede kolonne til en vektor ved hjælp af hashing
- Flet de numeriske funktioner med vektoren
- Hvis du vil behandle navnekolonnen, skal du konvertere kolonnen til den korrekte type
- Oplær flere LogisticRegression-algoritmer i
traindatasættet med forskellige hyperparametre - Beregn området under ROC-kurven for hver af de oplærte modeller, og vælg modellen med den højeste metrikværdi som beregnet på
testdatasættet - Evaluer den bedste model på
validationsættet
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))
Klassificer ved hjælp af SynapseML
Indstillingen synapseml omfatter enklere trin:
TrainClassifierEstimator viser internt dataene, så længe de kolonner, der er valgt itraindatasættet ,testvalidationrepræsenterer funktionerneFindBestModelEstimator finder den bedste model fra en pulje af oplærte modeller. For at gøre dette finder den model, der fungerer bedst påtestdatasættet, ud fra den angivne metrikværdiComputeModelStatisticsTransformeren beregner de forskellige målepunkter for et scoret datasæt (i vores tilfældevalidationdatasættet) på samme tid
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)