Apache Spark MLlib를 사용하여 기계 학습 모델 빌드

이 문서에서는 Apache Spark MLlib를 사용하여 Azure 공개 데이터 세트에 대한 간단한 예측 분석을 수행하는 기계 학습 애플리케이션을 만드는 방법에 대해 알아봅니다. Spark는 기본 제공 기계 학습 라이브러리를 제공합니다. 이 예제에서는 로지스틱 회귀를 통해 분류를 사용합니다.

SparkML 및 MLlib는 다음에 적합한 유틸리티를 포함하여 기계 학습 작업에 유용한 많은 유틸리티를 제공하는 핵심 Spark 라이브러리입니다.

  • 분류
  • 회귀
  • Clustering
  • 토픽 모델링
  • SVD(단수 값 분해) 및 PCA(보안 주체 분석)
  • 가설 테스트 및 계산 샘플 통계

분류 및 로지스틱 회귀의 이해

널리 사용되는 기계 학습 작업인 분류는 입력 데이터를 범주로 정렬하는 프로세스입니다. 제공하는 입력 데이터에 레이블을 할당하는 방법을 파악하는 것은 분류 알고리즘의 작업입니다. 예를 들어 입력으로 주식 정보를 받아들이고 판매할 주식과 보유할 주식의 두 가지 범주로 나누는 기계 학습 알고리즘을 생각할 수 있습니다.

로지스틱 회귀는 분류에 사용할 수 있는 알고리즘입니다. Spark의 로지스틱 회귀 API는 이진 분류 또는 입력 데이터를 두 그룹 중 하나로 분류하는 데 유용합니다. 로지스틱 회귀에 대한 자세한 내용은 Wikipedia를 참조하세요.

요약하자면, 로지스 틱 회귀 프로세스는 입력 벡터가 한 그룹 또는 다른 그룹에 속할 확률을 예측하는 데 사용할 수 있는 로지스틱 함수 를 생성합니다.

NYC 택시 데이터에 대한 예측 분석 예제

시작하려면 .를 설치합니다 azureml-opendatasets. 데이터는 Azure Open Datasets를 통해 사용할 수 있습니다. 데이터 세트의 이 하위 집합에는 시작 및 종료 시간 및 위치, 비용 및 기타 특성을 포함하여 노란색 택시 여정에 대한 정보가 포함됩니다.

%pip install azureml-opendatasets

이 문서의 나머지 부분에는 Apache Spark를 사용하여 NYC 택시 여행 팁 데이터에 대한 분석을 수행한 다음 특정 여정에 팁이 포함되어 있는지 여부를 예측하는 모델을 개발합니다.

Apache Spark 기계 학습 모델 만들기

  1. PySpark Notebook을 만듭니다. 자세한 지침은 Notebook 만들기를 참조하세요.

  2. 이 Notebook에 필요한 형식을 가져옵니다.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. MLflow를 사용하여 기계 학습 실험 및 해당 실행을 추적합니다. Microsoft Fabric 자동 로깅을 사용하도록 설정하면 해당 메트릭 및 매개 변수가 자동으로 캡처됩니다.

    import mlflow
    

입력 데이터 프레임 생성

이 예제에서는 Pandas 데이터 프레임에 데이터를 로드한 다음 Apache Spark 데이터 프레임으로 변환합니다. 이 형식을 사용하여 다른 Apache Spark 작업을 적용하여 데이터 세트를 클린 필터링할 수 있습니다.

  1. 다음 줄을 실행하여 코드를 새 셀에 붙여넣어 Spark DataFrame을 만듭니다. 이 단계에서는 Open Datasets API를 통해 데이터를 검색합니다. 이 데이터를 필터링하여 특정 데이터 창을 살펴볼 수 있습니다. 다음 코드 예제에서는 한 달의 데이터를 반환하는 필터를 사용하고 start_dateend_date 적용합니다.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. 다음 코드는 데이터 세트를 약 10,000개의 행으로 줄입니다. 개발 및 학습 속도를 높이기 위해 현재 데이터 세트를 샘플링합니다.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. 다음으로, 기본 제공 display() 명령을 사용하여 데이터를 살펴보겠습니다. 이를 통해 데이터 샘플을 쉽게 보거나 데이터의 추세를 그래픽으로 탐색할 수 있습니다.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

데이터 준비

데이터 준비는 기계 학습 프로세스에서 중요한 단계입니다. 분석 및 모델링에 적합하도록 원시 데이터를 클린, 변환 및 구성하는 작업이 포함됩니다. 다음 코드에서는 여러 데이터 준비 단계를 수행합니다.

  • 데이터 세트를 필터링하여 이상값 및 잘못된 값 제거
  • 모델 학습에 필요하지 않은 열 제거
  • 원시 데이터에서 새 열 만들기
  • 레이블을 생성하여 지정된 택시 여정에 대한 팁이 있는지 여부를 확인합니다.
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

그런 다음 데이터를 두 번째로 전달하여 최종 기능을 추가합니다.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

로지스틱 회귀 모델 만들기

마지막 작업은 레이블이 지정된 데이터를 로지스틱 회귀를 통해 분석할 수 있는 형식으로 변환하는 것입니다. 로지스틱 회귀 알고리즘에 대한 입력은 레이블/기능 벡터 쌍 집합이어야 합니다. 여기서 기능 벡터는 입력 지점을 나타내는 숫자의 벡터입니다.

따라서 범주 열을 숫자로 변환해야 합니다. 특히 열과 weekdayString 열을 정수 표현으로 변환 trafficTimeBins 해야 합니다. 변환을 수행하는 방법에는 여러 가지가 있습니다. 다음 예제에서는 이 방법을 사용합니다 OneHotEncoder .

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

이 작업을 수행하면 모델을 학습하는 데 적합한 형식의 모든 열이 있는 새 DataFrame이 생성됩니다.

로지스틱 회귀 모델 학습

첫 번째 작업은 데이터 세트를 학습 집합과 테스트 또는 유효성 검사 집합으로 분할하는 것입니다.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

이제 두 개의 DataFrame이 있으므로 다음 작업은 모델 수식을 만들고 학습 DataFrame에 대해 실행하는 것입니다. 그런 다음 테스트 데이터 프레임에 대해 유효성을 검사할 수 있습니다. 모델 수식의 다양한 버전을 실험하여 다양한 조합의 영향을 확인합니다.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

이 셀의 출력은 다음과 같습니다.

Area under ROC = 0.9749430523917996

예측의 시각적 표현 만들기

이제 최종 시각화를 생성하여 모델 결과를 해석할 수 있습니다. ROC 곡선은 결과를 검토하는 한 가지 방법입니다.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graph that shows the ROC curve for logistic regression in the tip model.