Bygg en maskinlæringsmodell med Apache Spark MLlib

I denne artikkelen lærer du hvordan du bruker Apache Spark MLlib til å opprette et maskinlæringsprogram som gjør enkle prediktive analyser på et Azure-åpent datasett. Spark gir innebygde maskinlæringsbiblioteker. Dette eksemplet bruker klassifisering gjennom logistisk regresjon.

SparkML og MLlib er kjernebiblioteker i Spark som tilbyr mange verktøy som er nyttige for maskinlæringsoppgaver, inkludert verktøy som er egnet for:

  • Klassifisering
  • Regresjon
  • Klynging
  • Emnemodellering
  • Entallsverdikomponering (SVD) og hovedkomponentanalyse (PCA)
  • Hypotesetesting og beregning av eksempelstatistikk

Forstå klassifisering og logistisk regresjon

Klassifisering, en populær maskinlæringsoppgave, er prosessen med å sortere inndata i kategorier. Det er jobben til en klassifiseringsalgoritme å finne ut hvordan du tilordner etiketter til inndata som du oppgir . Du kan for eksempel tenke på en maskinlæringsalgoritme som godtar aksjeinformasjon som inndata og deler aksjen i to kategorier: aksjer som du bør selge og aksjer som du bør beholde.

Logistisk regresjon er en algoritme som du kan bruke til klassifisering. Sparks logistiske regresjons-API er nyttig for binær klassifisering, eller klassifisering av inndata i én av to grupper. Hvis du vil ha mer informasjon om logistisk regresjon, kan du se Wikipedia.

Oppsummert produserer prosessen med logistisk regresjon en logistisk funksjon som du kan bruke til å forutsi sannsynligheten for at en inndatavektor hører til i én gruppe eller den andre.

Eksempel på prediktiv analyse på NYC-taxidata

Kom i gang ved å installere azureml-opendatasets. Dataene er tilgjengelige via Azure Open Datasets. Dette delsettet av datasettet inneholder informasjon om gule taxiturer, inkludert start- og sluttidspunkt og plasseringer, kostnader og andre attributter.

%pip install azureml-opendatasets

I resten av denne artikkelen bruker vi Apache Spark til å utføre noen analyser på NYC taxi-trip tips data og deretter utvikle en modell for å forutsi om en bestemt tur inkluderer et tips eller ikke.

Opprette en Apache Spark-maskinlæringsmodell

  1. Opprett en PySpark-notatblokk. Hvis du vil ha instruksjoner, kan du se Opprette en notatblokk.

  2. Importer typene som kreves for denne notatblokken.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Vi vil bruke MLflow til å spore våre maskinlæringseksperimenter og tilsvarende kjøringer. Hvis Microsoft Fabric Autologging er aktivert, registreres de tilsvarende måledataene og parameterne automatisk.

    import mlflow
    

Konstruere datarammen for inndata

I dette eksemplet laster vi inn dataene i en Pandas-dataramme og konverterer dem til en Apache Spark-dataramme. Med dette formatet kan vi bruke andre Apache Spark-operasjoner for å rense og filtrere datasettet.

  1. Kjør følgende linjer for å opprette en Spark DataFrame ved å lime inn koden i en ny celle. Dette trinnet henter dataene via API-en For åpne datasett. Vi kan filtrere disse dataene ned for å se på et bestemt datavindu. Følgende kodeeksempel bruker start_date og end_date bruker et filter som returnerer én enkelt måned med data.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Følgende kode reduserer datasettet til omtrent 10 000 rader. For å øke hastigheten på utviklingen og opplæringen, vil vi prøve ned datasettet for øyeblikket.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Deretter ønsker vi å ta en titt på dataene våre ved hjelp av den innebygde display() kommandoen. Dette gjør at vi enkelt kan vise et utvalg av dataene eller utforske trendene i dataene grafisk.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Klargjøre dataene

Dataforberedelse er et viktig trinn i maskinlæringsprosessen. Det innebærer rengjøring, transformering og organisering av rådata for å gjøre dem egnet for analyse og modellering. I følgende kode utfører du flere trinn for klargjøring av data:

  • Fjern ytterpunkter og uriktige verdier ved å filtrere datasettet
  • Fjern kolonner som ikke er nødvendige for modellopplæring
  • Opprette nye kolonner fra rådataene
  • Generer en etikett for å finne ut om det vil være et tips eller ikke for den gitte taxituren
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Vi vil deretter gjøre en ny overføring over dataene for å legge til de endelige funksjonene.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Opprette en logistisk regresjonsmodell

Den endelige oppgaven er å konvertere de merkede dataene til et format som kan analyseres gjennom logistisk regresjon. Inndataene til en logistisk regresjonsalgoritme må være et sett med etikett-/funksjonsvektorpar, der funksjonsvektoren er en vektor av tall som representerer inndatapunktet.

Så du må konvertere de kategoriske kolonnene til tall. Spesielt må du konvertere trafficTimeBins kolonnene og weekdayString kolonnene til heltallsrepresentasjoner. Det finnes flere fremgangsmåter for å utføre konverteringen. Eksemplet nedenfor tar OneHotEncoder fremgangsmåten.

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Denne handlingen resulterer i en ny DataFrame med alle kolonner i riktig format for å lære opp en modell.

Lære opp en logistisk regresjonsmodell

Den første oppgaven er å dele datasettet inn i et opplæringssett og et test- eller valideringssett.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Nå som det finnes to DataFrames, er neste oppgave å opprette modellformelen og kjøre den mot opplæringsdatarammen. Deretter kan du validere mot testdatarammen. Eksperimenter med ulike versjoner av modellformelen for å se virkningen av ulike kombinasjoner.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Utdataene fra denne cellen er:

Area under ROC = 0.9749430523917996

Opprette en visuell representasjon av prognosen

Nå kan du konstruere en endelig visualisering for å tolke modellresultatene. En ROC-kurve er én måte å se gjennom resultatet på.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graph that shows the ROC curve for logistic regression in the tip model.