Sdílet prostřednictvím


Vytvoření modelu strojového učení pomocí knihovny Apache Spark MLlib

V tomto článku se dozvíte, jak pomocí knihovny Apache Spark MLlib vytvořit aplikaci strojového učení, která zpracovává jednoduchou prediktivní analýzu v otevřené datové sadě Azure. Spark poskytuje integrované knihovny strojového učení. Tento příklad používá klasifikaci prostřednictvím logistické regrese.

Základní knihovny SparkML a MLlib Spark poskytují mnoho nástrojů, které jsou užitečné pro úlohy strojového učení. Tyto nástroje jsou vhodné pro:

  • Klasifikace
  • Clustering
  • Testování hypotéz a výpočet vzorové statistiky
  • Regrese
  • Rozklad hodnoty s jednotným číslem (SVD) a analýza hlavních komponent (PCA)
  • Modelování témat

Vysvětlení klasifikace a logistické regrese

Klasifikace, oblíbená úloha strojového učení, zahrnuje řazení vstupních dat do kategorií. Klasifikační algoritmus by měl zjistit, jak přiřadit popisky zadaným vstupním datům. Například algoritmus strojového učení může přijímat informace o akciích jako vstup a rozdělit akcie do dvou kategorií: akcie, které byste měli prodávat, a akcie, které byste měli zachovat.

Algoritmus logistické regrese je užitečný pro klasifikaci. Rozhraní API logistické regrese Sparku je užitečné pro binární klasifikaci vstupních dat do jedné ze dvou skupin. Další informace o logistické regresi najdete na Wikipedii.

Logistická regrese vytváří logistickou funkci , která dokáže předpovědět pravděpodobnost, že vstupní vektor patří do jedné skupiny nebo druhé.

Příklad prediktivní analýzy dat taxislužby NYC

Nejprve nainstalujte azureml-opendatasets. Data jsou k dispozici prostřednictvím prostředku Azure Open Datasets . Tato podmnožina datové sady hostuje informace o žlutých cestách taxíkem, včetně počátečních časů, časů ukončení, umístění startu, koncových umístění, nákladů na jízdu a dalších atributů.

%pip install azureml-opendatasets

Zbytek tohoto článku spoléhá na Apache Spark, aby nejprve provedl analýzu dat tipu taxi-výletu v NYC a pak vyvinul model, který předpovídá, jestli konkrétní cesta obsahuje tip, nebo ne.

Vytvoření modelu strojového učení Apache Sparku

  1. Vytvořte poznámkový blok PySpark. Další informace najdete v tématu Vytvoření poznámkového bloku.

  2. Importujte typy požadované pro tento poznámkový blok.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. MLflow použijeme ke sledování experimentů strojového učení a odpovídajících spuštění. Pokud je povolené automatické protokolování Microsoft Fabric, zachytí se automaticky odpovídající metriky a parametry.

    import mlflow
    

Vytvoření vstupního datového rámce

Tento příklad načte data do datového rámce Pandas a pak je převede na datový rámec Apache Spark. V takovém formátu můžeme k vyčištění a filtrování datové sady použít další operace Apache Sparku.

  1. Vložte tyto řádky do nové buňky a spusťte je a vytvořte datový rámec Sparku. Tento krok načte data prostřednictvím rozhraní API Open Datasets. Tato data můžeme filtrovat, abychom prozkoumali konkrétní okno dat. Příklad kódu používá start_date a end_date použije filtr, který vrátí jeden měsíc dat.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Tento kód zmenšuje datovou sadu na přibližně 10 000 řádků. Pokud chcete urychlit vývoj a trénování, ukázky kódu pro naši datovou sadu prozatím zrychlují.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Chceme se podívat na naše data pomocí integrovaného display() příkazu. Pomocí tohoto příkazu můžeme snadno zobrazit ukázku dat nebo graficky prozkoumat trendy v datech.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Příprava dat

Příprava dat je zásadním krokem v procesu strojového učení. Zahrnuje čištění, transformaci a organizaci nezpracovaných dat, aby byla vhodná pro analýzu a modelování. V této ukázce kódu provedete několik kroků přípravy dat:

  • Filtrováním datové sady odeberte odlehlé hodnoty a nesprávné hodnoty.
  • Odebrání sloupců, které nejsou potřeba pro trénování modelu
  • Vytvoření nových sloupců z nezpracovaných dat
  • Vygenerování popisku pro určení, jestli daná cesta taxi zahrnuje tip
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

V dalším kroku proveďte druhé předání dat pro přidání konečných funkcí.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Vytvoření logistického regresního modelu

Poslední úloha převede označená data do formátu, který může logistická regrese zpracovat. Vstup do logistického regresního algoritmu musí mít strukturu dvojic vektorů popisku/funkce, kde vektor funkce je vektor čísla, který představuje vstupní bod.

Na základě požadavků na konečný úkol musíme převést sloupce kategorií na čísla. Konkrétně musíme převést trafficTimeBins sloupce na weekdayString celočíselné reprezentace. Pro zpracování tohoto požadavku máme k dispozici řadu možností. Tento příklad zahrnuje OneHotEncoder přístup:

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Výsledkem této akce je nový datový rámec se všemi sloupci ve správném formátu pro trénování modelu.

Trénování logistického regresního modelu

První úloha rozdělí datovou sadu na trénovací sadu a testovací nebo ověřovací sadu.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Jakmile máme dva datové rámce, musíme vytvořit vzorec modelu a spustit ho s trénovacím datovým rámcem. Pak můžeme ověřit testovací datový rámec. Experimentujte s různými verzemi vzorce modelu, abyste viděli efekty různých kombinací.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Výstupy buňky:

Area under ROC = 0.9749430523917996

Vytvoření vizuální reprezentace předpovědi

Teď můžeme vytvořit konečnou vizualizaci pro interpretaci výsledků modelu. Křivka ROC může jistě prezentovat výsledek.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graf znázorňující křivku ROC pro logistickou regresi v modelu špičky