Byg en model til maskinel indlæring med Apache Spark MLlib

Vigtigt

Microsoft Fabric fås som prøveversion.

I denne artikel får du mere at vide om, hvordan du bruger Apache Spark MLlib til at oprette et program til maskinel indlæring, der udfører simple forudsigende analyser på et åbent Azure-datasæt. Spark leverer indbyggede biblioteker til maskinel indlæring. I dette eksempel bruges klassificering via logistisk regression.

SparkML og MLlib er kernebiblioteker i Spark, der leverer mange hjælpeprogrammer, der er nyttige til opgaver i forbindelse med maskinel indlæring, herunder hjælpeprogrammer, der er velegnede til:

  • Klassificering
  • Regression
  • Clustering
  • Emnemodellering
  • SVD (Singular Value Decomposition) og PCA (Principal Component Analysis)
  • Hypotesetest og beregning af eksempelstatistik

Forstå klassificering og logistisk regression

Klassificering, som er en populær maskinel indlæringsopgave, er processen med at sortere inputdata i kategorier. Det er en klassificeringsalgoritmes opgave at finde ud af, hvordan du tildeler mærkater til inputdata, som du angiver. Du kan f.eks. tænke på en maskinel indlæringsalgoritme, der accepterer aktieoplysninger som input og opdeler aktien i to kategorier: aktier, som du skal sælge, og aktier, som du skal beholde.

Logistisk regression er en algoritme, som du kan bruge til klassificering. Sparks logistiske regressions-API er nyttig til binær klassificering eller klassificering af inputdata i en af to grupper. Du kan finde flere oplysninger om logistisk regression under Wikipedia.

Kort sagt opretter processen med logistisk regression en logistisk funktion , som du kan bruge til at forudsige sandsynligheden for, at en inputvektor tilhører den ene eller den anden gruppe.

Eksempel på forudsigende analyse på NYC-taxadata

Du kommer i gang ved at installere azureml-opendatasets. Dataene er tilgængelige via Azure Open Datasets. Dette undersæt af datasættet indeholder oplysninger om gule taxature, herunder start- og sluttidspunkt og placeringer, omkostninger og andre attributter.

%pip install azureml-opendatasets

I resten af denne artikel bruger vi Apache Spark til at udføre nogle analyser af TIP-dataene for NYC-taxaturen og derefter udvikle en model for at forudsige, om en bestemt tur indeholder et tip eller ej.

Opret en Apache Spark-model til maskinel indlæring

  1. Opret en PySpark-notesbog. Du kan finde en vejledning under Opret en notesbog.

  2. Importér de typer, der kræves til denne notesbog.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Vi bruger MLflow til at spore vores eksperimenter med maskinel indlæring og tilsvarende kørsler. Hvis Microsoft Fabric Autologging er aktiveret, registreres de tilsvarende målepunkter og parametre automatisk.

    import mlflow
    

Konstruer inputdatarammen

I dette eksempel indlæser vi dataene i en Pandas-dataramme og konverterer dem derefter til en Apache Spark-dataramme. Ved hjælp af dette format kan vi anvende andre Apache Spark-handlinger til at rense og filtrere datasættet.

  1. Kør følgende linjer for at oprette en Spark DataFrame ved at indsætte koden i en ny celle. Dette trin henter dataene via API'en For åbne datasæt. Vi kan filtrere disse data ned for at se på et bestemt vindue med data. I følgende kodeeksempel bruges start_date og end_date anvendes et filter, der returnerer en enkelt måned med data.

     from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc__pd).repartition(20)
    
    
  2. Følgende kode reducerer datasættet til omkring 10.000 rækker. For at fremskynde udviklingen og oplæringen vil vi prøve vores datasæt ned indtil videre.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Derefter vil vi se nærmere på vores data ved hjælp af den indbyggede display() kommando. Det giver os mulighed for nemt at få vist et eksempel på dataene eller udforske tendenserne i dataene grafisk.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Forbered dataene

Dataforberedelse er et afgørende trin i processen til maskinel indlæring. Det omfatter rensning, transformering og organisering af rådata for at gøre dem egnet til analyse og modellering. I følgende kode udfører du flere trin til dataforberedelse:

  • Fjern afvigende værdier og forkerte værdier ved at filtrere datasættet
  • Fjern kolonner, der ikke er nødvendige til modeltræning
  • Opret nye kolonner ud fra rådata
  • Opret en mærkat for at bestemme, om der vil være et tip eller ej for den angivne Taxa-tur
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Vi foretager derefter endnu et pass over dataene for at tilføje de endelige funktioner.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Opret en logistisk regressionsmodel

Den sidste opgave er at konvertere de navngivne data til et format, der kan analyseres via logistisk regression. Inputtet til en logistisk regressionsalgoritme skal være et sæt mærkat-/funktionsvektorpar, hvor funktionsvektoren er en vektor af tal, der repræsenterer inputpunktet.

Så du skal konvertere de kategoriske kolonner til tal. Du skal specifikt konvertere kolonnerne trafficTimeBins og weekdayString til heltalsrepræsentationer. Der er flere måder at udføre konverteringen på. I følgende eksempel bruges tilgangen OneHotEncoder .

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Denne handling resulterer i en ny DataFrame med alle kolonner i det rigtige format til at oplære en model.

Oplær en logistisk regressionsmodel

Den første opgave er at opdele datasættet i et oplæringssæt og et test- eller valideringssæt.

# Decide on the split between training and testing data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Nu, hvor der er to DataFrames, er den næste opgave at oprette modelformlen og køre den mod oplæringen DataFrame. Derefter kan du validere i forhold til test af DataFrame. Eksperimentér med forskellige versioner af modelformlen for at se virkningen af forskellige kombinationer.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Outputtet fra denne celle er:

Area under ROC = 0.9749430523917996

Opret en visuel repræsentation af forudsigelsen

Du kan nu konstruere en endelig visualisering for at fortolke modelresultaterne. En ROC-kurve er én måde at gennemse resultatet på.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graf, der viser ROC-kurven for logistisk regression i tipmodellen.

Næste trin