Dela via


Självstudie: Skapa en maskininlärningsapp med Apache Spark MLlib och Azure Synapse Analytics

I den här artikeln får du lära dig hur du använder Apache Spark MLlib för att skapa ett maskininlärningsprogram som utför enkel förutsägelseanalys på en öppen Azure-datauppsättning. Spark tillhandahåller inbyggda maskininlärningsbibliotek. I det här exemplet används klassificering via logistisk regression.

SparkML och MLlib är centrala Spark-bibliotek som tillhandahåller många verktyg som är användbara för maskininlärningsuppgifter, inklusive verktyg som är lämpliga för:

  • Klassificering
  • Regression
  • Klustring
  • Ämnesmodellering
  • Singulär värde nedbrytning (SVD) och huvudkomponentanalys (PCA)
  • Hypotestestning och beräkning av exempelstatistik

Förstå klassificering och logistisk regression

Klassificering, en populär maskininlärningsuppgift, är processen att sortera indata i kategorier. Det är en klassificeringsalgoritms jobb att ta reda på hur du tilldelar etiketter till indata som du anger. Du kan till exempel tänka på en maskininlärningsalgoritm som accepterar aktieinformation som indata och dela in aktien i två kategorier: aktier som du bör sälja och aktier som du bör behålla.

Logistisk regression är en algoritm som du kan använda för klassificering. Sparks logistiska regressions-API är användbart för binär klassificering eller klassificering av indata i en av två grupper. Mer information om logistisk regression finns i Wikipedia.

Sammanfattningsvis skapar processen för logistisk regression en logistisk funktion som du kan använda för att förutsäga sannolikheten för att en indatavektor tillhör den ena gruppen eller den andra.

Exempel på förutsägelseanalys på NYC-taxidata

I det här exemplet använder du Spark för att utföra en förutsägelseanalys av tipsdata för taxiresor från New York. Data är tillgängliga via Azure Open Datasets. Den här delmängden av datamängden innehåller information om gula taxiresor, inklusive information om varje resa, start- och sluttid och platser, kostnaden och andra intressanta attribut.

Viktigt

Det kan tillkomma ytterligare avgifter för att hämta dessa data från lagringsplatsen.

I följande steg utvecklar du en modell för att förutsäga om en viss resa innehåller ett tips eller inte.

Skapa en Apache Spark-maskininlärningsmodell

  1. Skapa en notebook-fil med hjälp av PySpark-kerneln. Anvisningar finns i Skapa en anteckningsbok.

  2. Importera de typer som krävs för det här programmet. Kopiera och klistra in följande kod i en tom cell och tryck sedan på Skift+Retur. Eller kör cellen med hjälp av den blå uppspelningsikonen till vänster om koden.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    

    På grund av PySpark-kerneln behöver du inte skapa några kontexter explicit. Spark-kontexten skapas automatiskt åt dig när du kör den första kodcellen.

Skapa indataramen

Eftersom rådata är i Parquet-format kan du använda Spark-kontexten för att hämta filen till minnet som en DataFrame direkt. Även om koden i följande steg använder standardalternativen är det möjligt att framtvinga mappning av datatyper och andra schemaattribut om det behövs.

  1. Kör följande rader för att skapa en Spark DataFrame genom att klistra in koden i en ny cell. Det här steget hämtar data via API:et Open Datasets. Om du hämtar alla dessa data genereras cirka 1,5 miljarder rader.

    Beroende på storleken på din serverlösa Apache Spark-pool kan rådata vara för stora eller ta för lång tid att arbeta med. Du kan filtrera dessa data till något mindre. I följande kodexempel används start_date och end_date används ett filter som returnerar en enda månad med data.

    from azureml.opendatasets import NycTlcYellow
    
    from datetime import datetime
    from dateutil import parser
    
    end_date = parser.parse('2018-05-08 00:00:00')
    start_date = parser.parse('2018-05-01 00:00:00')
    
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    filtered_df = spark.createDataFrame(nyc_tlc.to_pandas_dataframe())
    
    
  2. Nackdelen med enkel filtrering är att det ur ett statistiskt perspektiv kan introducera bias i data. En annan metod är att använda samplingen som är inbyggd i Spark.

    Följande kod minskar datauppsättningen till cirka 2 000 rader, om den tillämpas efter föregående kod. Du kan använda det här samplingssteget i stället för det enkla filtret eller tillsammans med det enkla filtret.

    # To make development easier, faster, and less expensive, downsample for now
    sampled_taxi_df = filtered_df.sample(True, 0.001, seed=1234)
    
  3. Nu går det att titta på data för att se vad som lästes. Det är normalt bättre att granska data med en delmängd snarare än den fullständiga uppsättningen, beroende på datauppsättningens storlek.

    Följande kod erbjuder två sätt att visa data. Det första sättet är grundläggande. Det andra sättet ger en mycket rikare rutnätsupplevelse, tillsammans med möjligheten att visualisera data grafiskt.

    #sampled_taxi_df.show(5)
    display(sampled_taxi_df)
    
  4. Beroende på storleken på den genererade datauppsättningen och ditt behov av att experimentera eller köra anteckningsboken många gånger kanske du vill cachelagras datauppsättningen lokalt på arbetsytan. Det finns tre sätt att utföra explicit cachelagring:

    • Spara DataFrame lokalt som en fil.
    • Spara DataFrame som en tillfällig tabell eller vy.
    • Spara DataFrame som en permanent tabell.

De två första metoderna ingår i följande kodexempel.

Att skapa en tillfällig tabell eller vy ger olika åtkomstsökvägar till data, men den varar bara under hela Spark-instanssessionen.

sampled_taxi_df.createOrReplaceTempView("nytaxi")

Förbereda data

Data i dess råa form är ofta inte lämpliga för att skicka direkt till en modell. Du måste utföra en serie åtgärder på data för att få dem till ett tillstånd där modellen kan använda dem.

I följande kod utför du fyra åtgärdsklasser:

  • Borttagning av avvikande värden eller felaktiga värden genom filtrering.
  • Borttagning av kolumner som inte behövs.
  • Skapandet av nya kolumner som härleds från rådata för att få modellen att fungera mer effektivt. Den här åtgärden kallas ibland för funktionalisering.
  • Märkning. Eftersom du utför binär klassificering (kommer det att finnas ett tips eller inte på en viss resa), finns det ett behov av att konvertera tipsbeloppet till ett 0- eller 1-värde.
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                                , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                                , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                                , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                                , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                                , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                                )\
                        .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                                & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                                & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                                & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                                & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                                & (sampled_taxi_df.rateCodeId <= 5)
                                & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                                )

Sedan gör du en andra vidareströmning av data för att lägga till de sista funktionerna.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Skapa en logistisk regressionsmodell

Den sista uppgiften är att konvertera märkta data till ett format som kan analyseras via logistisk regression. Indata till en logistisk regressionsalgoritm måste vara en uppsättning etikett-/funktionsvektorpar, där funktionsvektorn är en vektor med tal som representerar indatapunkten.

Därför måste du konvertera de kategoriska kolumnerna till tal. Mer specifikt behöver du konvertera kolumnerna trafficTimeBins och weekdayString till heltalsrepresentationer. Det finns flera metoder för att utföra konverteringen. I följande exempel används OneHotEncoder metoden , vilket är vanligt.

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Den här åtgärden resulterar i en ny DataFrame med alla kolumner i rätt format för att träna en modell.

Träna en logistisk regressionsmodell

Den första uppgiften är att dela upp datamängden i en träningsuppsättning och en test- eller valideringsuppsättning. Uppdelningen här är godtycklig. Experimentera med olika delningsinställningar för att se om de påverkar modellen.

# Decide on the split between training and testing data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Nu när det finns två DataFrames är nästa uppgift att skapa modellformeln och köra den mot träningsdataramen. Sedan kan du verifiera mot testdataramen. Experimentera med olika versioner av modellformeln för att se effekten av olika kombinationer.

Anteckning

Om du vill spara modellen tilldelar du rollen Storage Blob Data Contributor till resursomfånget Azure SQL Database-server. Läs mer om att tilldela roller i Tilldela Azure-roller via Azure Portal. Endast medlemmar med ägarbehörighet kan utföra det här steget.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Saving the model is optional, but it's another form of inter-session cache
datestamp = datetime.now().strftime('%m-%d-%Y-%s')
fileName = "lrModel_" + datestamp
logRegDirfilename = fileName
lrModel.save(logRegDirfilename)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Utdata från den här cellen är:

Area under ROC = 0.9779470729751403

Skapa en visuell representation av förutsägelsen

Nu kan du skapa en slutlig visualisering som hjälper dig att resonera om resultatet av det här testet. En ROC-kurva är ett sätt att granska resultatet.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Diagram som visar ROC-kurvan för logistisk regression i tipsmodellen.

Stäng av Spark-instansen

När du har kört programmet stänger du anteckningsboken för att frigöra resurserna genom att stänga fliken. Eller välj Avsluta session på statuspanelen längst ned i anteckningsboken.

Se även

Nästa steg

Anteckning

En del av den officiella Apache Spark-dokumentationen förlitar sig på att använda Spark-konsolen, som inte är tillgänglig på Apache Spark i Azure Synapse Analytics. Använd notebook- eller IntelliJ-upplevelserna i stället.