다음을 통해 공유


Apache Spark MLlib을 사용하여 기계 학습 모델 빌드하기

이 문서에서는 Apache Spark MLlib을 사용하여 Azure 공개 데이터 세트에서 간단한 예측 분석을 수행하는 기계 학습 애플리케이션을 만드는 방법을 알아봅니다. Spark는 기본 제공 기계 학습 라이브러리를 제공합니다. 이 예제에서는 로지스틱 회귀를 통한 분류를 사용합니다.

SparkML 및 MLlib은 기계 학습 작업에 유용한 많은 유틸리티를 제공하는 핵심 Spark 라이브러리입니다. 이러한 유틸리티는 다음에 적합합니다.

  • 분류
  • Clustering
  • 가설 테스트 및 샘플 통계 계산
  • 회귀
  • SVD(특이값 분해) 및 PCA(주성분 분석)
  • 항목 모델링

분류 및 로지스틱 회귀의 이해

널리 사용되는 Machine Learning 작업인 분류는 입력 데이터를 범주로 정렬하는 프로세스입니다. 분류 알고리즘은 제공된 입력 데이터에 레이블을 할당하는 방법을 알아내야 합니다. 예를 들어 입력으로 주식 정보를 받아들이고 판매할 주식과 보유할 주식의 두 가지 범주로 나누는 기계 학습 알고리즘을 생각할 수 있습니다.

로지스틱 회귀는 분류할 때 도움을 주는 알고리즘입니다. Spark의 로지스틱 회귀 API는 입력 데이터를 이진 분류하여 두 그룹 중 하나로 분류하는 데 유용합니다. 로지스틱 회귀에 대한 자세한 내용은 Wikipedia를 참조하세요.

로지스틱 회귀 프로세스는 입력 벡터가 한 그룹 또는 다른 그룹에 속할 확률을 예측하는 데 사용할 수 있는 로지스틱 함수 를 만듭니다.

NYC 택시 데이터에 대한 예측 분석 예제

먼저 azureml-opendatasets를 설치합니다. 데이터는 Azure 공개 데이터 세트 리소스를 통해 사용할 수 있습니다. 이 데이터 세트 하위 집합은 시작 시간, 종료 시간, 시작 위치, 종료 위치, 여정 비용 및 기타 특성을 포함하여 노란색 택시 여정에 대한 정보를 호스팅합니다.

%pip install azureml-opendatasets

이 문서의 나머지 내용은 Apache Spark를 사용하여 먼저 NYC Taxi-trip 팁 데이터에 대한 몇 가지 분석을 수행한 다음 특정 여정에 팁이 포함되어 있는지 여부를 예측하는 모델을 개발합니다.

Apache Spark 기계 학습 모델 만들기

  1. PySpark Notebook을 만듭니다. 자세한 내용은 Notebook 만들기를 참조하세요.

  2. 이 Notebook에 필요한 형식을 가져옵니다.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. MLflow를 사용하여 기계 학습 실험 및 해당 실행을 추적합니다. Microsoft Fabric 자동 로깅을 사용하도록 설정하면 해당 메트릭 및 매개 변수가 자동으로 캡처됩니다.

    import mlflow
    

입력 DataFrame 생성

다음은 Pandas 데이터 프레임에 데이터를 로드한 다음 Apache Spark 데이터 프레임으로 변환하는 예제입니다. 해당 형식으로 다른 Apache Spark 작업을 적용하여 데이터 세트를 정리하고 필터링할 수 있습니다.

  1. 이러한 줄을 새 셀에 붙여넣고 실행하여 Spark DataFrame을 만듭니다. 이 단계는 Open Datasets API를 통해 데이터를 검색합니다. 이 데이터를 필터링하여 특정 데이터 창을 검사할 수 있습니다. 다음 코드 예제에서는 start_dateend_date(을)를 사용하여 한 달 분량의 데이터를 반환하는 필터를 적용합니다.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. 이 코드는 데이터 세트를 약 10,000개의 행으로 줄입니다. 개발 및 학습 속도를 높이기 위해 코드는 현재 데이터 세트를 샘플링합니다.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. 기본 제공 display() 명령을 사용하여 데이터를 확인하려고 합니다. 이 명령을 사용하면 데이터 샘플을 쉽게 보거나 데이터의 추세를 그래픽으로 탐색할 수 있습니다.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

데이터 준비

데이터 준비는 기계 학습 프로세스에서 중요한 단계입니다. 분석 및 모델링에 적합하도록 원시 데이터의 정리, 변환 및 구성이 포함됩니다. 이 코드 샘플에서는 다음과 같은 몇 가지 데이터 준비 단계를 수행합니다.

  • 이상값 및 잘못된 값을 제거하도록 데이터 세트를 필터링합니다.
  • 모델 학습에 필요하지 않은 열 제거
  • 원시 데이터에서 새 열 만들기
  • 레이블을 생성하여 지정된 택시 여정에 팁이 포함되는지 여부를 확인합니다.
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

그런 다음, 데이터를 두 번째로 전달하여 최종 기능을 추가합니다.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

로지스틱 회귀 모델 만들기

마지막 작업은 레이블이 지정된 데이터를 로지스틱 회귀를 통해 핸들할 수 있는 형식으로 변환하는 것입니다. 로지스틱 회귀 알고리즘에 대한 입력은 레이블/기능 벡터 쌍의 세트여야 하며, 여기서 기능 벡터는 입력 지점을 나타내는 숫자의 벡터입니다.

최종 작업 요구 사항에 따라 범주 열을 숫자로 변환해야 합니다. 특히 trafficTimeBinsweekdayString 열을 정수 표현으로 변환해야 합니다. 이 요구 사항을 처리할 수 있는 많은 옵션이 있습니다. 이 예제에는 다음 OneHotEncoder 방법이 포함됩니다.

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

이 작업을 수행하면 모델을 학습시키는 데 적합한 형식의 모든 열이 포함된 새 DataFrame이 생성됩니다.

로지스틱 회귀 모델 학습

첫 번째 작업은 데이터 세트를 학습 세트와 테스트 또는 유효성 검사 세트로 분할하는 것입니다.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

두 개의 DataFrame이 있으면 모델 수식을 만들고 학습 DataFrame에 대해 실행해야 합니다. 그런 다음, 테스트 DataFrame에 대해 유효성을 검사할 수 있습니다. 여러 버전의 모델 수식을 실험하여 다양한 조합의 영향을 확인합니다.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

셀은 다음을 출력합니다.

Area under ROC = 0.9749430523917996

예측의 시각적 표현 만들기

이제 최종 시각화를 빌드하여 모델 결과를 해석할 수 있습니다. ROC 곡선은 확실히 결과를 표시할 수 있습니다.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

팁 모델의 로지스틱 회귀 분석에 대한 ROC 곡선을 보여 주는 그래프입니다.