Jaa


Koneoppimismallin luominen Apache Spark MLlib:n avulla

Tässä artikkelissa kerrotaan, miten voit Apache Spark MLlibin avulla luoda koneoppimissovelluksen, joka käsittelee yksinkertaista ennakoivaa analyysia Azuren avoimesta tietojoukosta. Spark tarjoaa sisäisiä koneoppimiskirjastoja. Tässä esimerkissä luokitusta käytetään logistista regressiota käyttämällä.

SparkML- ja MLlib Spark -ydinkirjastot tarjoavat monia apuohjelmia, joista on hyötyä koneoppimistehtävissä. Nämä apuohjelmat soveltuvat seuraaviin:

  • Luokitus
  • Klusterointi
  • Hypoteesitestaus ja mallitilastojen laskeminen
  • Regressio
  • SVD-hajotus ja pääosa-analyysi (PCA)
  • Aiheen mallinnus

Tutustu luokitukseen ja logistiseen regressioon

Luokittelu, suosittu koneoppimistehtävä, sisältää syötetietojen lajittelemisen luokkiin. Luokitusalgoritmin tulisi selvittää, miten annetuille syötetiedille määritetään otsikoita . Esimerkiksi koneoppimisalgoritmi voisi hyväksyä varastotiedot syötteeksi ja jakaa osakkeet kahteen luokkaan: osakkeisiin, joita sinun pitäisi myydä, ja osakkeisiin, jotka sinun pitäisi säilyttää.

Logistinen regressioalgoritmi on hyödyllinen luokituksessa. Spark-logistinen regressio-ohjelmointirajapinta on hyödyllinen syötetietojen binaariluokittelussa yhteen kahdesta ryhmästä. Lisätietoja logistisesta regressiosta on Wikipediassa.

Logistinen regressio tuottaa logistisen funktion , joka voi ennustaa todennäköisyyden sille, että syötevektori kuuluu yhteen tai toiseen ryhmään.

Ennakoiva analyysi esimerkki New Yorkin kaupungin taksitiedoista

Asenna azureml-opendatasetsensin . Tiedot ovat käytettävissä Azure Open Datasets -resurssin kautta. Tässä tietojoukon alijoukossa isännöidä tietoja keltaisista taksimatkoista, mukaan lukien alkamisajat, päättymisajat, alkamissijainnit, päättymissijainnit, matkakustannukset ja muut määritteet.

%pip install azureml-opendatasets

Artikkelin loppuosa käyttää Apache Sparkiä ensin tekemään analyysin NYC:n taksimatkan vinkkitiedoista ja kehittämään sitten mallin, joka ennustaa, sisältääkö tietty matka vinkin vai ei.

Luo Apache Spark -koneoppimismalli

  1. Luo PySpark-muistikirja. Lisätietoja on kohdassa Muistikirjan luominen.

  2. Tuo tähän muistikirjaan tarvittavat tyypit.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Käytämme MLflow-funktiota koneoppimiskokeilujen ja sitä vastaavien suoritusten seurantaan. Jos Microsoft Fabricin automaattinen loggaus on käytössä, vastaavat mittarit ja parametrit siepataan automaattisesti.

    import mlflow
    

Muodosta syötteen DataFrame

Tämä esimerkki lataa tiedot Pandas-tietokehykseen ja muuntaa ne sitten Apache Spark -tietokehykseksi. Tässä muodossa voimme käyttää muita Apache Spark -toimintoja tietojoukon siistimiseksi ja suodattamiseksi.

  1. Liitä nämä rivit uuteen soluun ja suorita ne Spark DataFrame -kehyksen luomiseksi. Tämä vaihe noutaa tiedot Avointen tietojoukkojen ohjelmointirajapinnan kautta. Voimme suodattaa nämä tiedot alaspäin ja tarkastella tiettyä tietoikkunaa. Koodiesimerkki käyttää start_date ja end_date käyttää suodatinta, joka palauttaa yhden kuukauden tiedot.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Tämä koodi pienentää tietojoukon noin 10 000 riviin. Jos haluat nopeuttaa kehitystä ja koulutusta, koodi ottaa näytteitä tietojoukostamme toistaiseksi.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Haluamme tarkastella tietojamme käyttämällä sisäistä komentoa display() . Tämän komennon avulla voimme helposti tarkastella tietomallia tai tutkia tietojen trendejä graafisesti.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Tietojen valmistelu

Tietojen valmistelu on tärkeä vaihe koneoppimisprosessissa. Se sisältää siistimisen, muuntamisen ja raakadatan järjestämisen, jotta se sopii analysointiin ja mallinnukseen. Tässä koodimallissa suoritat useita tietojen valmistelun vaiheita:

  • Poista poikkeavat arvot ja virheelliset arvot suodattamalla tietojoukko
  • Poista sarakkeet, joita ei tarvita mallin harjoittamisessa
  • Luo uusia sarakkeita raakatiedoista
  • Luo tunniste sen määrittämiseksi, liittyykö annettuun taksimatkaan vinkki
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Lisää sitten lopulliset ominaisuudet lisäämällä tietojen toinen välitys.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Logistisen regressiomallin luominen

Viimeisessä tehtävässä otsikoidut tiedot muunnetaan muotoon, jota logistinen regressio pystyy käsittelemään. Logistisen regressioalgoritmin syötteellä on oltava otsikko-/ominaisuusvektoriparien rakenne, jossa ominaisuusvektori on syötepistettä edustavien lukujen vektori.

Lopullisten tehtävän vaatimusten perusteella luokittaiset sarakkeet on muunnettava luvuiksi. Ja -sarakkeet on muunnettava trafficTimeBins weekdayString kokonaislukumuotoiseksi esitykseksi. Meillä on käytettävissämme monia vaihtoehtoja tämän vaatimuksen käsittelemiseksi. Tässä esimerkissä käsitellään seuraavaa OneHotEncoder lähestymistapaa:

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Tämän toiminnon tuloksena saadaan uusi DataFrame, jossa kaikki sarakkeet ovat oikeassa muodossa mallin harjoittamiseksi.

Logistisen regressiomallin harjoittaminen

Ensimmäisessä tehtävässä tietojoukko jaetaan harjoitusjoukkoon ja testaus- tai vahvistusjoukkoon.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Kun käytössä on kaksi DataFrame-kehystä, meidän täytyy luoda mallikaava ja suorittaa se harjoittamisen DataFrame-kehyksessä. Sitten voimme vahvistaa testin dataFrame-kehyksen avulla. Kokeile mallikaavan eri versioita, jotta näet eri yhdistelmien vaikutukset.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Solun tuloksena on:

Area under ROC = 0.9749430523917996

Visuaalisen esityksen luominen ennusteesta

Voimme nyt luoda lopullisen visualisoinnin mallin tulosten tulkitsemiseksi. ROC-käyrä voi varmasti esittää tuloksen.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Kaavio, joka näyttää logistista regressiota vastaavan ROC-käyrän kärkimallissa.