Læs på engelsk

Del via


Byg en model til maskinel indlæring med Apache Spark MLlib

I denne artikel får du mere at vide om, hvordan du bruger Apache Spark MLlib til at oprette et program til maskinel indlæring, der håndterer enkel forudsigende analyse på et Azure Open-datasæt. Spark leverer indbyggede biblioteker til maskinel indlæring. I dette eksempel bruges klassificering via logistisk regression.

Kernebibliotekerne SparkML og MLlib Spark indeholder mange hjælpeprogrammer, der er nyttige til opgaver i forbindelse med maskinel indlæring. Disse hjælpeprogrammer er velegnede til:

  • Klassificering
  • Klyngedannelse
  • Hypotesetest og beregning af eksempelstatistik
  • Regression
  • SVD (Singular Value Decomposition) og PCA (Principal Component Analysis)
  • Emnemodellering

Forstå klassificering og logistisk regression

Klassificering, der er en populær maskinel indlæringsopgave, omfatter sortering af inputdata i kategorier. En klassificeringsalgoritme skal finde ud af, hvordan du tildeler mærkater til de angivne inputdata. En algoritme til maskinel indlæring kan f.eks. acceptere aktieoplysninger som input og opdele bestanden i to kategorier: aktier, som du skal sælge, og aktier, som du skal beholde.

Logistisk regressionsalgoritme er nyttig til klassificering. Spark-logistisk regressions-API er nyttig til binær klassificering af inputdata i en af to grupper. Du kan få flere oplysninger om logistisk regression under Wikipedia.

Logistisk regression producerer en logistisk funktion , der kan forudsige sandsynligheden for, at en inputvektor tilhører den ene eller den anden gruppe.

Eksempel på forudsigende analyse af NYC-taxadata

Først skal du installere azureml-opendatasets. Dataene er tilgængelige via azure Open Datasets-ressourcen. Dette datasætundersæt hoster oplysninger om gule taxature, herunder starttidspunkter, sluttidspunkter, startplaceringer, slutplaceringer, rejseomkostninger og andre attributter.

%pip install azureml-opendatasets

Resten af denne artikel er afhængig af Apache Spark for først at udføre en analyse af DATAENE for NYC-drikkepenge og derefter udvikle en model for at forudsige, om en bestemt tur indeholder et tip eller ej.

Opret en Apache Spark-model til maskinel indlæring

  1. Opret en PySpark-notesbog. Du kan finde flere oplysninger under Opret en notesbog.

  2. Importér de typer, der kræves til denne notesbog.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Vi bruger MLflow til at spore vores eksperimenter med maskinel indlæring og tilsvarende kørsler. Hvis Microsoft Fabric Autologging er aktiveret, registreres de tilsvarende målepunkter og parametre automatisk.

    import mlflow
    

Konstruer inputdatarammen

Dette eksempel indlæser dataene i en Pandas-dataramme og konverterer dem derefter til en Apache Spark-dataramme. I dette format kan vi anvende andre Apache Spark-handlinger for at rense og filtrere datasættet.

  1. Indsæt disse linjer i en ny celle, og kør dem for at oprette en Spark DataFrame. Dette trin henter dataene via API'en Open Datasets. Vi kan filtrere disse data ned for at undersøge et bestemt vindue med data. Kodeeksempel bruger start_date og end_date anvender et filter, der returnerer en enkelt måneds data.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Denne kode reducerer datasættet til ca. 10.000 rækker. Kodeeksempler ned i vores datasæt indtil videre for at fremskynde udviklingen og oplæringen.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Vi vil gerne se på vores data ved hjælp af den indbyggede display() kommando. Med denne kommando kan vi nemt få vist et dataeksempel eller udforske tendenser i dataene grafisk.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Forbered dataene

Dataforberedelse er et afgørende trin i processen til maskinel indlæring. Det omfatter rengøring, transformation og organisering af rådata for at gøre dem egnede til analyse og modellering. I dette kodeeksempel udfører du flere trin til dataforberedelse:

  • Filtrer datasættet for at fjerne udenforliggende værdier og forkerte værdier
  • Fjern kolonner, der ikke er nødvendige til modeltræning
  • Opret nye kolonner ud fra rådata
  • Opret en mærkat for at bestemme, om en given taxatur omfatter et tip eller ej
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Derefter skal du foretage endnu et pass over dataene for at tilføje de endelige funktioner.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Opret en logistisk regressionsmodel

Den endelige opgave konverterer de navngivne data til et format, som logistisk regression kan håndtere. Inputtet til en logistisk regressionsalgoritme skal have en mærkat-/funktionsvektorparstruktur, hvor funktionsvektoren er en vektor af tal, der repræsenterer inputpunktet.

Baseret på de endelige opgavekrav skal vi konvertere de kategoriske kolonner til tal. Vi skal specifikt konvertere kolonnerne trafficTimeBins og weekdayString til heltalsrepræsentationer. Vi har mange muligheder til at håndtere dette krav. Dette eksempel omfatter tilgangen OneHotEncoder :

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Denne handling resulterer i en ny DataFrame med alle kolonner i det korrekte format for at oplære en model.

Oplær en logistisk regressionsmodel

Den første opgave opdeler datasættet i et oplæringssæt og et test- eller valideringssæt.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Når vi har to DataFrames, skal vi oprette modelformlen og køre den mod træningsdatarammen. Derefter kan vi validere i forhold til testdataframen. Eksperimentér med forskellige versioner af modelformlen for at se effekten af forskellige kombinationer.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Celleoutputtet:

Area under ROC = 0.9749430523917996

Opret en visuel gengivelse af forudsigelsen

Vi kan nu oprette en endelig visualisering for at fortolke modelresultaterne. En ROC-kurve kan helt sikkert præsentere resultatet.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graf, der viser ROC-kurven for logistisk regression i tipmodellen.