Bygg en maskinlæringsmodell med Apache Spark MLlib
I denne artikkelen lærer du hvordan du bruker Apache Spark MLlib til å opprette et maskinlæringsprogram som gjør enkle prediktive analyser på et Azure-åpent datasett. Spark gir innebygde maskinlæringsbiblioteker. Dette eksemplet bruker klassifisering gjennom logistisk regresjon.
SparkML og MLlib er kjernebiblioteker i Spark som tilbyr mange verktøy som er nyttige for maskinlæringsoppgaver, inkludert verktøy som er egnet for:
- Klassifisering
- Regresjon
- Klynging
- Emnemodellering
- Entallsverdikomponering (SVD) og hovedkomponentanalyse (PCA)
- Hypotesetesting og beregning av eksempelstatistikk
Forstå klassifisering og logistisk regresjon
Klassifisering, en populær maskinlæringsoppgave, er prosessen med å sortere inndata i kategorier. Det er jobben til en klassifiseringsalgoritme å finne ut hvordan du tilordner etiketter til inndata som du oppgir . Du kan for eksempel tenke på en maskinlæringsalgoritme som godtar aksjeinformasjon som inndata og deler aksjen i to kategorier: aksjer som du bør selge og aksjer som du bør beholde.
Logistisk regresjon er en algoritme som du kan bruke til klassifisering. Sparks logistiske regresjons-API er nyttig for binær klassifisering, eller klassifisering av inndata i én av to grupper. Hvis du vil ha mer informasjon om logistisk regresjon, kan du se Wikipedia.
Oppsummert produserer prosessen med logistisk regresjon en logistisk funksjon som du kan bruke til å forutsi sannsynligheten for at en inndatavektor hører til i én gruppe eller den andre.
Eksempel på prediktiv analyse på NYC-taxidata
Kom i gang ved å installere azureml-opendatasets
. Dataene er tilgjengelige via Azure Open Datasets. Dette delsettet av datasettet inneholder informasjon om gule taxiturer, inkludert start- og sluttidspunkt og plasseringer, kostnader og andre attributter.
%pip install azureml-opendatasets
I resten av denne artikkelen bruker vi Apache Spark til å utføre noen analyser på NYC taxi-trip tips data og deretter utvikle en modell for å forutsi om en bestemt tur inkluderer et tips eller ikke.
Opprette en Apache Spark-maskinlæringsmodell
Opprett en PySpark-notatblokk. Hvis du vil ha instruksjoner, kan du se Opprette en notatblokk.
Importer typene som kreves for denne notatblokken.
import matplotlib.pyplot as plt from datetime import datetime from dateutil import parser from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml import PipelineModel from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer from pyspark.ml.classification import LogisticRegression from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.ml.evaluation import BinaryClassificationEvaluator
Vi vil bruke MLflow til å spore våre maskinlæringseksperimenter og tilsvarende kjøringer. Hvis Microsoft Fabric Autologging er aktivert, registreres de tilsvarende måledataene og parameterne automatisk.
import mlflow
Konstruere datarammen for inndata
I dette eksemplet laster vi inn dataene i en Pandas-dataramme og konverterer dem til en Apache Spark-dataramme. Med dette formatet kan vi bruke andre Apache Spark-operasjoner for å rense og filtrere datasettet.
Kjør følgende linjer for å opprette en Spark DataFrame ved å lime inn koden i en ny celle. Dette trinnet henter dataene via API-en For åpne datasett. Vi kan filtrere disse dataene ned for å se på et bestemt datavindu. Følgende kodeeksempel bruker
start_date
ogend_date
bruker et filter som returnerer én enkelt måned med data.from azureml.opendatasets import NycTlcYellow end_date = parser.parse('2018-06-06') start_date = parser.parse('2018-05-01') nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date) nyc_tlc_pd = nyc_tlc.to_pandas_dataframe() nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
Følgende kode reduserer datasettet til omtrent 10 000 rader. For å øke hastigheten på utviklingen og opplæringen, vil vi prøve ned datasettet for øyeblikket.
# To make development easier, faster, and less expensive, sample down for now sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
Deretter ønsker vi å ta en titt på dataene våre ved hjelp av den innebygde
display()
kommandoen. Dette gjør at vi enkelt kan vise et utvalg av dataene eller utforske trendene i dataene grafisk.#sampled_taxi_df.show(10) display(sampled_taxi_df.limit(10))
Klargjøre dataene
Dataforberedelse er et viktig trinn i maskinlæringsprosessen. Det innebærer rengjøring, transformering og organisering av rådata for å gjøre dem egnet for analyse og modellering. I følgende kode utfører du flere trinn for klargjøring av data:
- Fjern ytterpunkter og uriktige verdier ved å filtrere datasettet
- Fjern kolonner som ikke er nødvendige for modellopplæring
- Opprette nye kolonner fra rådataene
- Generer en etikett for å finne ut om det vil være et tips eller ikke for den gitte taxituren
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
, 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
, date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
, date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
, (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
, (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
)\
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Vi vil deretter gjøre en ny overføring over dataene for å legge til de endelige funksjonene.
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
, 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
, when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
.when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
.when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
.when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
.otherwise(0).alias('trafficTimeBins')
)\
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Opprette en logistisk regresjonsmodell
Den endelige oppgaven er å konvertere de merkede dataene til et format som kan analyseres gjennom logistisk regresjon. Inndataene til en logistisk regresjonsalgoritme må være et sett med etikett-/funksjonsvektorpar, der funksjonsvektoren er en vektor av tall som representerer inndatapunktet.
Så du må konvertere de kategoriske kolonnene til tall. Spesielt må du konvertere trafficTimeBins
kolonnene og weekdayString
kolonnene til heltallsrepresentasjoner. Det finnes flere fremgangsmåter for å utføre konverteringen. Eksemplet nedenfor tar OneHotEncoder
fremgangsmåten.
# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")
# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Denne handlingen resulterer i en ny DataFrame med alle kolonner i riktig format for å lære opp en modell.
Lære opp en logistisk regresjonsmodell
Den første oppgaven er å dele datasettet inn i et opplæringssett og et test- eller valideringssett.
# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Nå som det finnes to DataFrames, er neste oppgave å opprette modellformelen og kjøre den mot opplæringsdatarammen. Deretter kan du validere mot testdatarammen. Eksperimenter med ulike versjoner av modellformelen for å se virkningen av ulike kombinasjoner.
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')
## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")
## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)
Utdataene fra denne cellen er:
Area under ROC = 0.9749430523917996
Opprette en visuell representasjon av prognosen
Nå kan du konstruere en endelig visualisering for å tolke modellresultatene. En ROC-kurve er én måte å se gjennom resultatet på.
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()
Relatert innhold
- Bruk AI-eksempler til å bygge maskinlæringsmodeller: Bruk AI-eksempler
- Spor maskinlæringskjøringer ved hjelp av eksperimenter: Maskinlæringseksperimenter
Tilbakemeldinger
https://aka.ms/ContentUserFeedback.
Kommer snart: Gjennom 2024 faser vi ut GitHub Issues som tilbakemeldingsmekanisme for innhold, og erstatter det med et nytt system for tilbakemeldinger. Hvis du vil ha mer informasjon, kan du se:Send inn og vis tilbakemelding for