Byg en model til maskinel indlæring med Apache Spark MLlib
I denne artikel får du mere at vide om, hvordan du bruger Apache Spark MLlib til at oprette et program til maskinel indlæring, der håndterer enkel forudsigende analyse på et Azure Open-datasæt. Spark leverer indbyggede biblioteker til maskinel indlæring. I dette eksempel bruges klassificering via logistisk regression.
Kernebibliotekerne SparkML og MLlib Spark indeholder mange hjælpeprogrammer, der er nyttige til opgaver i forbindelse med maskinel indlæring. Disse hjælpeprogrammer er velegnede til:
- Klassificering
- Klyngedannelse
- Hypotesetest og beregning af eksempelstatistik
- Regression
- SVD (Singular Value Decomposition) og PCA (Principal Component Analysis)
- Emnemodellering
Klassificering, der er en populær maskinel indlæringsopgave, omfatter sortering af inputdata i kategorier. En klassificeringsalgoritme skal finde ud af, hvordan du tildeler mærkater til de angivne inputdata. En algoritme til maskinel indlæring kan f.eks. acceptere aktieoplysninger som input og opdele bestanden i to kategorier: aktier, som du skal sælge, og aktier, som du skal beholde.
Logistisk regressionsalgoritme er nyttig til klassificering. Spark-logistisk regressions-API er nyttig til binær klassificering af inputdata i en af to grupper. Du kan få flere oplysninger om logistisk regression under Wikipedia.
Logistisk regression producerer en logistisk funktion , der kan forudsige sandsynligheden for, at en inputvektor tilhører den ene eller den anden gruppe.
Først skal du installere azureml-opendatasets
. Dataene er tilgængelige via azure Open Datasets-ressourcen. Dette datasætundersæt hoster oplysninger om gule taxature, herunder starttidspunkter, sluttidspunkter, startplaceringer, slutplaceringer, rejseomkostninger og andre attributter.
%pip install azureml-opendatasets
Resten af denne artikel er afhængig af Apache Spark for først at udføre en analyse af DATAENE for NYC-drikkepenge og derefter udvikle en model for at forudsige, om en bestemt tur indeholder et tip eller ej.
Opret en PySpark-notesbog. Du kan finde flere oplysninger under Opret en notesbog.
Importér de typer, der kræves til denne notesbog.
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluator
Vi bruger MLflow til at spore vores eksperimenter med maskinel indlæring og tilsvarende kørsler. Hvis Microsoft Fabric Autologging er aktiveret, registreres de tilsvarende målepunkter og parametre automatisk.
import mlflow
Dette eksempel indlæser dataene i en Pandas-dataramme og konverterer dem derefter til en Apache Spark-dataramme. I dette format kan vi anvende andre Apache Spark-handlinger for at rense og filtrere datasættet.
Indsæt disse linjer i en ny celle, og kør dem for at oprette en Spark DataFrame. Dette trin henter dataene via API'en Open Datasets. Vi kan filtrere disse data ned for at undersøge et bestemt vindue med data. Kodeeksempel bruger
start_date
ogend_date
anvender et filter, der returnerer en enkelt måneds data.from azureml.opendatasets import NycTlcYellow end_date = parser.parse('2018-06-06') start_date = parser.parse('2018-05-01') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) nyc_tlc_pd = nyc_tlc.to_pandas_dataframe() nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
Denne kode reducerer datasættet til ca. 10.000 rækker. Kodeeksempler ned i vores datasæt indtil videre for at fremskynde udviklingen og oplæringen.
# To make development easier, faster, and less expensive, sample down for now sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
Vi vil gerne se på vores data ved hjælp af den indbyggede
display()
kommando. Med denne kommando kan vi nemt få vist et dataeksempel eller udforske tendenser i dataene grafisk.#sampled_taxi_df.show(10) display(sampled_taxi_df.limit(10))
Dataforberedelse er et afgørende trin i processen til maskinel indlæring. Det omfatter rengøring, transformation og organisering af rådata for at gøre dem egnede til analyse og modellering. I dette kodeeksempel udfører du flere trin til dataforberedelse:
- Filtrer datasættet for at fjerne udenforliggende værdier og forkerte værdier
- Fjern kolonner, der ikke er nødvendige til modeltræning
- Opret nye kolonner ud fra rådata
- Opret en mærkat for at bestemme, om en given taxatur omfatter et tip eller ej
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Derefter skal du foretage endnu et pass over dataene for at tilføje de endelige funktioner.
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Den endelige opgave konverterer de navngivne data til et format, som logistisk regression kan håndtere. Inputtet til en logistisk regressionsalgoritme skal have en mærkat-/funktionsvektorparstruktur, hvor funktionsvektoren er en vektor af tal, der repræsenterer inputpunktet.
Baseret på de endelige opgavekrav skal vi konvertere de kategoriske kolonner til tal. Vi skal specifikt konvertere kolonnerne trafficTimeBins
og weekdayString
til heltalsrepræsentationer. Vi har mange muligheder til at håndtere dette krav. Dette eksempel omfatter tilgangen OneHotEncoder
:
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Denne handling resulterer i en ny DataFrame med alle kolonner i det korrekte format for at oplære en model.
Den første opgave opdeler datasættet i et oplæringssæt og et test- eller valideringssæt.
# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Når vi har to DataFrames, skal vi oprette modelformlen og køre den mod træningsdatarammen. Derefter kan vi validere i forhold til testdataframen. Eksperimentér med forskellige versioner af modelformlen for at se effekten af forskellige kombinationer.
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
Celleoutputtet:
Area under ROC = 0.9749430523917996
Vi kan nu oprette en endelig visualisering for at fortolke modelresultaterne. En ROC-kurve kan helt sikkert præsentere resultatet.
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
- Brug AI-eksempler til at bygge modeller til maskinel indlæring: Brug AI-eksempler
- Spor kørsler af maskinel indlæring ved hjælp af eksperimenter: Eksperimenter med maskinel indlæring