모듈 3: Apache Spark를 사용하여 데이터 정리 및 준비 수행

NYC Yellow Taxi 데이터 세트에는 매월 15억 개 이상의 여행 레코드가 포함되어 있으며, 수백만 개의 레코드가 실행되어 이러한 레코드를 계산 비용이 많이 들고 비분배 처리 엔진에서는 처리할 수 없는 경우가 많습니다.

중요

Microsoft Fabric은 현재 미리 보기로 제공됩니다. 이 정보는 릴리스되기 전에 상당히 수정될 수 있는 시험판 제품과 관련이 있습니다. Microsoft는 여기에 제공된 정보와 관련하여 명시적이거나 묵시적인 어떠한 보증도 하지 않습니다.

이 자습서에서는 Apache Spark Notebook을 사용하여 택시 운행 데이터 세트를 클린 준비하는 방법을 보여 줍니다. Spark의 최적화된 배포 엔진은 대량의 데이터를 처리하는 데 적합합니다.

비교적 작은 크기의 데이터 세트의 경우 Microsoft Fabric Notebook에서 pandas 데이터 프레임으로 작업하는 사용자에게 대화형 탐색 및 데이터 정리 환경을 제공하는 Notebook 기반 그래픽 사용자 인터페이스 도구인 Data Wrangler UI를 사용합니다.

다음 단계에서는 레이크하우스 델타 레이크 테이블(모듈 1에 저장됨)에서 원시 NYC Taxi 데이터를 읽고 다양한 작업을 수행하여 해당 데이터를 클린 변환하여 기계 학습 모델 학습을 준비합니다.

Notebook에서 팔로우

이 자습서의 각 단계에서 사용되는 python 명령/스크립트는 함께 제공되는 Notebook: 03-perform-data-cleansing-and-preparation-using-apache-spark.ipynb에서 찾을 수 있습니다. 실행하기 전에 레이크하우스를 Notebook에 연결 해야 합니다.

정리 및 준비

  1. 명령을 사용하여 레이크하우스 델타 테이블 nyctaxi_raw NYC 노란색 택시 데이터를 로드합니다 spark.read .

    nytaxi_df = spark.read.format("delta").load("Tables/nyctaxi_raw")
    
  2. 데이터 정리 프로세스를 지원하기 위해 다음으로, 데이터 프레임에서 열의 측면을 설명하는 숫자 측정값인 요약 통계를 생성하는 Apache Spark의 기본 제공 요약 기능을 사용합니다. 이러한 측정값에는 개수, 평균, 표준 편차, 최소 및 최대값이 포함됩니다. 다음 명령을 사용하여 taxi 데이터 세트의 모든 열에 대한 요약 통계를 볼 수 있습니다.

    display(nytaxi_df.summary())
    

    참고

    요약 통계 생성은 계산 비용이 많이 드는 프로세스이며 데이터 프레임의 크기에 따라 상당한 실행 시간이 걸릴 수 있습니다. 이 자습서에서 단계는 2~3분 정도 걸립니다.

    생성된 요약 통계 목록의 스크린샷

  3. 이 단계에서는 nytaxi_df 데이터 프레임을 클린 기존 열 값에서 파생된 열을 더 추가합니다.

    다음은 이 단계에서 수행되는 작업 집합입니다.

    1. 파생 열 추가

      • pickupDate - 시각화 및 보고를 위해 datetime을 날짜로 변환
      • weekDay - 요일 번호
      • weekDayName - 약어로 된 일 이름
      • dayofMonth - 월의 일 수
      • pickupHour - 픽업 시간
      • tripDuration - 여행 시간(분)을 나타냅니다.
      • timeBins - 하루 중 범주화된 시간
    2. 필터 조건

      • fareAmount는 100 사이입니다.
      • tripDistance가 0보다 큰 경우
      • tripDuration은 3시간(180분) 미만입니다.
      • passengerCount는 1에서 8 사이입니다.
      • startLat, startLon, endLat, endLon은 NULL이 아닙니다.
      • 아웃스테이션 여정(이상값) tripDistance>100을 제거합니다.
    from pyspark.sql.functions import col,when, dayofweek, date_format, hour,unix_timestamp, round, dayofmonth, lit
    nytaxidf_prep = nytaxi_df.withColumn('pickupDate', col('tpepPickupDateTime').cast('date'))\
                               .withColumn("weekDay", dayofweek(col("tpepPickupDateTime")))\
                               .withColumn("weekDayName", date_format(col("tpepPickupDateTime"), "EEEE"))\
                               .withColumn("dayofMonth", dayofweek(col("tpepPickupDateTime")))\
                               .withColumn("pickupHour", hour(col("tpepPickupDateTime")))\
                               .withColumn("tripDuration", (unix_timestamp(col("tpepDropoffDateTime")) - unix_timestamp(col("tpepPickupDateTime")))/60)\
                               .withColumn("timeBins", when((col("pickupHour") >=7) & (col("pickupHour")<=10) ,"MorningRush")\
                               .when((col("pickupHour") >=11) & (col("pickupHour")<=15) ,"Afternoon")\
                               .when((col("pickupHour") >=16) & (col("pickupHour")<=19) ,"EveningRush")\
                               .when((col("pickupHour") <=6) | (col("pickupHour")>=20) ,"Night"))\
                               .filter("""fareAmount > 0 AND fareAmount < 100 and tripDistance > 0 AND tripDistance < 100 
                                        AND tripDuration > 0 AND tripDuration <= 189 
                                        AND passengerCount > 0 AND passengerCount <= 8
                                        AND startLat IS NOT NULL AND startLon IS NOT NULL AND endLat IS NOT NULL AND endLon IS NOT NULL""")
    

    참고

    Apache Spark는 지연 평가 패러다임을 사용하여 작업이 트리거될 때까지 변환 실행을 지연합니다. 이를 통해 Spark는 실행 계획을 최적화하고 불필요한 계산을 방지할 수 있습니다. 이 단계에서는 변환 및 필터의 정의를 만듭니다. 실제 정리 및 변환은 다음 단계에서 데이터가 기록되면(작업) 트리거됩니다.

  4. 정리 단계를 정의하고 nytaxidf_prep 라는 데이터 프레임에 할당한 후에는 다음 명령 집합을 사용하여 정리되고 준비된 데이터를 연결된 레이크하우스의 새 델타 테이블(nyctaxi_prep)에 씁니다.

    table_name = "nyctaxi_prep"
    nytaxidf_prep.write.mode("overwrite").format("delta").save(f"Tables/{table_name}")
    print(f"Spark dataframe saved to delta table: {table_name}")
    

이 모듈에서 생성된 정리되고 준비된 데이터는 이제 레이크하우스에서 델타 테이블로 사용할 수 있으며 추가 처리 및 인사이트 생성에 사용할 수 있습니다.

다음 단계