Share via


Skapa en maskininlärningsmodell med Apache Spark MLlib

I den här artikeln får du lära dig hur du använder Apache Spark MLlib för att skapa ett maskininlärningsprogram som utför enkel förutsägelseanalys på en öppen Azure-datauppsättning. Spark tillhandahåller inbyggda maskininlärningsbibliotek. I det här exemplet används klassificering via logistisk regression.

SparkML och MLlib är centrala Spark-bibliotek som tillhandahåller många verktyg som är användbara för maskininlärningsuppgifter, inklusive verktyg som är lämpliga för:

  • Klassificering
  • Regression
  • Klustring
  • Ämnesmodellering
  • Singulär värde nedbrytning (SVD) och huvudkomponentanalys (PCA)
  • Hypotestestning och beräkning av exempelstatistik

Förstå klassificering och logistisk regression

Klassificering, en populär maskininlärningsuppgift, är processen att sortera indata i kategorier. Det är en klassificeringsalgoritms jobb att ta reda på hur du tilldelar etiketter till indata som du anger. Du kan till exempel tänka på en maskininlärningsalgoritm som accepterar aktieinformation som indata och delar in aktien i två kategorier: aktier som du bör sälja och aktier som du bör behålla.

Logistisk regression är en algoritm som du kan använda för klassificering. Sparks logistiska regressions-API är användbart för binär klassificering eller klassificering av indata i en av två grupper. Mer information om logistisk regression finns i Wikipedia.

Sammanfattningsvis skapar processen för logistisk regression en logistisk funktion som du kan använda för att förutsäga sannolikheten att en indatavektor hör hemma i en grupp eller en annan.

Exempel på förutsägelseanalys på NYC-taxidata

Kom igång genom att installera azureml-opendatasets. Data är tillgängliga via Azure Open Datasets. Den här delmängden av datamängden innehåller information om gula taxiresor, inklusive start- och sluttid och platser, kostnaden och andra attribut.

%pip install azureml-opendatasets

I resten av den här artikeln använder vi Apache Spark för att utföra en analys av nyc taxi-trip-tipsdata och sedan utveckla en modell för att förutsäga om en viss resa innehåller ett tips eller inte.

Skapa en Apache Spark-maskininlärningsmodell

  1. Skapa en PySpark-anteckningsbok. Anvisningar finns i Skapa en notebook-fil.

  2. Importera de typer som krävs för den här notebook-filen.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Vi använder MLflow för att spåra våra maskininlärningsexperiment och motsvarande körningar. Om Microsoft Fabric Autologging är aktiverat registreras motsvarande mått och parametrar automatiskt.

    import mlflow
    

Konstruera indataramen

I det här exemplet läser vi in data i en Pandas-dataram och konverterar dem sedan till en Apache Spark-dataram. Med det här formatet kan vi använda andra Apache Spark-åtgärder för att rensa och filtrera datamängden.

  1. Kör följande rader för att skapa en Spark DataFrame genom att klistra in koden i en ny cell. Det här steget hämtar data via API:et Öppna datauppsättningar. Vi kan filtrera ned dessa data för att titta på ett specifikt datafönster. I följande kodexempel används start_date och end_date används ett filter som returnerar en enda månad med data.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Följande kod minskar datamängden till cirka 10 000 rader. För att påskynda utvecklingen och träningen kommer vi att ta ett exempel på vår datauppsättning för tillfället.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Sedan vill vi ta en titt på våra data med hjälp av det inbyggda display() kommandot. På så sätt kan vi enkelt visa ett exempel på data eller utforska trender i data grafiskt.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Förbereda data

Dataförberedelse är ett viktigt steg i maskininlärningsprocessen. Det handlar om att rensa, transformera och organisera rådata för att göra dem lämpliga för analys och modellering. I följande kod utför du flera dataförberedelsesteg:

  • Ta bort avvikande värden och felaktiga värden genom att filtrera datamängden
  • Ta bort kolumner som inte behövs för modellträning
  • Skapa nya kolumner från rådata
  • Generera en etikett för att avgöra om det kommer att finnas ett tips eller inte för den aktuella taxiresan
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Vi kommer sedan att göra en andra pass over data för att lägga till de sista funktionerna.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Skapa en logistisk regressionsmodell

Den sista uppgiften är att konvertera etiketterade data till ett format som kan analyseras genom logistisk regression. Indata till en logistisk regressionsalgoritm måste vara en uppsättning etikett-/funktionsvektorpar, där funktionsvektorn är en vektor med tal som representerar indatapunkten.

Därför måste du konvertera de kategoriska kolumnerna till tal. Mer specifikt behöver du konvertera kolumnerna trafficTimeBins och weekdayString till heltalsrepresentationer. Det finns flera metoder för att utföra konverteringen. I följande exempel används OneHotEncoder metoden.

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Den här åtgärden resulterar i en ny DataFrame med alla kolumner i rätt format för att träna en modell.

Träna en logistisk regressionsmodell

Den första uppgiften är att dela upp datamängden i en träningsuppsättning och en test- eller valideringsuppsättning.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Nu när det finns två DataFrames är nästa uppgift att skapa modellformeln och köra den mot träningsdataramen. Sedan kan du verifiera mot testdataramen. Experimentera med olika versioner av modellformeln för att se effekten av olika kombinationer.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Utdata från den här cellen är:

Area under ROC = 0.9749430523917996

Skapa en visuell representation av förutsägelsen

Nu kan du skapa en slutlig visualisering för att tolka modellresultaten. En ROC-kurva är ett sätt att granska resultatet.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graph that shows the ROC curve for logistic regression in the tip model.