Megosztás a következőn keresztül:


Gépi tanulási modell létrehozása az Apache Spark MLlib használatával

Ebből a cikkből megtudhatja, hogyan hozhat létre olyan gépi tanulási alkalmazást az Apache Spark MLlib használatával, amely egyszerű prediktív elemzéseket kezel egy Azure-beli nyílt adathalmazon. A Spark beépített gépi tanulási kódtárakat biztosít. Ez a példa logisztikai regresszióval történő besorolást használ.

Az alapvető SparkML- és MLlib Spark-kódtárak számos olyan segédprogramot biztosítanak, amelyek hasznosak a gépi tanulási feladatokhoz. Ezek a segédprogramok a következőkre alkalmasak:

  • Osztályozás
  • Fürtözés
  • Hipotézistesztelés és mintastatisztikák kiszámítása
  • Regresszió
  • Szingular value decomposition (SVD) és a fő összetevő elemzése (PCA)
  • Témakörmodellezés

A besorolás és a logisztikai regresszió ismertetése

A népszerű gépi tanulási feladat, a besorolás magában foglalja a bemeneti adatok kategóriákba rendezését. A besorolási algoritmusnak ki kell találnia, hogyan rendelhet címkéket a megadott bemeneti adatokhoz. Egy gépi tanulási algoritmus például elfogadhatja a készletadatokat bemenetként, és két kategóriába oszthatja az állományt: az eladandó készleteket és a megtartandó készleteket.

A logisztikai regressziós algoritmus a besoroláshoz hasznos. A Spark logisztikai regressziós API hasznos a bemeneti adatok bináris besorolásához két csoport egyikébe. A logisztikai regresszióval kapcsolatos további információkért lásd a Wikipédiát.

A logisztikai regresszió olyan logisztikai függvényt hoz létre, amely előrejelzi annak valószínűségét, hogy egy bemeneti vektor az egyik csoporthoz vagy a másikhoz tartozik.

Prediktív elemzési példa a NYC taxiadataira

Először telepítse a elemet azureml-opendatasets. Az adatok az Azure Open Datasets erőforráson keresztül érhetők el. Ez az adatkészlet-részhalmaz információkat tartalmaz a sárga taxiutakról, beleértve a kezdési időpontokat, a befejezési időpontokat, a kezdési helyeket, a véghelyeket, az utazás költségeit és egyéb attribútumokat.

%pip install azureml-opendatasets

A cikk további része az Apache Sparkra támaszkodik, hogy először végezzen elemzést az NYC taxiút-tipp adatairól, majd dolgozzon ki egy modellt annak előrejelzésére, hogy egy adott utazás tartalmaz-e tippet vagy sem.

Apache Spark gépi tanulási modell létrehozása

  1. PySpark-jegyzetfüzet létrehozása. További információt a Jegyzetfüzet létrehozása című témakörben talál.

  2. Importálja a jegyzetfüzethez szükséges típusokat.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Az MLflow használatával nyomon követjük a gépi tanulási kísérleteket és a megfelelő futtatásokat. Ha a Microsoft Fabric automatikus naplózása engedélyezve van, a rendszer automatikusan rögzíti a megfelelő metrikákat és paramétereket.

    import mlflow
    

A bemeneti DataFrame létrehozása

Ez a példa betölti az adatokat egy Pandas-adatkeretbe, majd Apache Spark-adatkeretté alakítja. Ebben a formátumban más Apache Spark-műveleteket is alkalmazhatunk az adathalmaz tisztítására és szűrésére.

  1. Illessze be ezeket a sorokat egy új cellába, és futtassa őket Spark DataFrame létrehozásához. Ez a lépés az Open Datasets API-val kéri le az adatokat. Ezeket az adatokat szűrhetjük egy adott adatablak vizsgálatához. A példakód egy olyan szűrőt használ start_date és end_date alkalmaz, amely egyetlen hónapnyi adatot ad vissza.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Ez a kód körülbelül 10 000 sorra csökkenti az adathalmazt. A fejlesztés és a betanítás felgyorsítása érdekében a kód egyelőre az adatkészletet mintázta.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. A beépített display() paranccsal szeretnénk áttekinteni az adatokat. Ezzel a paranccsal egyszerűen megtekintheti az adatmintát, vagy grafikusan feltárhatja az adatok trendjeit.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Az adatok előkészítése

Az adatok előkészítése a gépi tanulási folyamat kulcsfontosságú lépése. Ez magában foglalja a nyers adatok tisztítását, átalakítását és rendszerezését, hogy alkalmassá tegye az elemzésre és modellezésre. Ebben a kódmintában számos adatelőkészítési lépést hajt végre:

  • Az adathalmaz szűrése a kiugró értékek és a helytelen értékek eltávolításához
  • A modell betanításához nem szükséges oszlopok eltávolítása
  • Új oszlopok létrehozása a nyers adatokból
  • Címke létrehozása annak megállapításához, hogy egy adott taxiúthoz tartozik-e tipp
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

A következő lépésben adja át az adatokat egy második lépéssel a végleges funkciók hozzáadásához.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Logisztikai regressziós modell létrehozása

Az utolsó feladat a címkézett adatokat olyan formátummá alakítja, amelyet a logisztikai regresszió kezelni tud. A logisztikai regressziós algoritmus bemenetének címke-/funkcióvektorpár-struktúrával kell rendelkeznie, ahol a funkcióvektor a bemeneti pontot jelképező számok vektora.

A végső feladatkövetelmények alapján számmá kell konvertálnunk a kategorikus oszlopokat. Pontosabban az és weekdayString az trafficTimeBins oszlopokat egész számábrázolássá kell konvertálnunk. Ennek a követelménynek a kezelésére számos lehetőség áll rendelkezésre. Ez a példa a OneHotEncoder következő megközelítést foglalja magában:

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Ez a művelet egy új DataFrame-et eredményez, amelynek minden oszlopa megfelelő formátumban van a modell betanítása érdekében.

Logisztikai regressziós modell betanítása

Az első feladat az adathalmazt egy betanítási csoportra, valamint egy tesztelési vagy érvényesítési csoportra osztja fel.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Ha már két DataFrame-ünk van, létre kell hoznunk a modellképletet, és le kell futtatnunk a betanítási DataFrame-ben. Ezután érvényesíthetjük a teszt dataFrame-et. Kísérletezzen a modellképlet különböző verzióival a különböző kombinációk hatásainak megtekintéséhez.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

A cellakimenetek:

Area under ROC = 0.9749430523917996

Az előrejelzés vizuális ábrázolásának létrehozása

Most már létrehozhatunk egy végleges vizualizációt a modell eredményeinek értelmezéséhez. A ROC-görbe minden bizonnyal megjeleníti az eredményt.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

A logisztikai regresszió ROC-görbét ábrázoló grafikon a tippmodellben.