Modul 3: Melakukan pembersihan dan persiapan data menggunakan Apache Spark

Himpunan data Taksi Kuning NYC berisi lebih dari 1,5 Miliar catatan perjalanan dengan setiap bulan data perjalanan yang masuk ke jutaan catatan, yang membuat pemrosesan catatan ini mahal secara komputasi dan sering kali tidak layak dengan mesin pemrosesan yang tidak didistribusikan.

Penting

Microsoft Fabric saat ini dalam PRATINJAU. Informasi ini berkaitan dengan produk prarilis yang mungkin dimodifikasi secara substansial sebelum dirilis. Microsoft tidak memberikan jaminan, tersurat maupun tersirat, sehubungan dengan informasi yang diberikan di sini.

Dalam tutorial ini, kami menunjukkan cara menggunakan notebook Apache Spark untuk membersihkan dan menyiapkan himpunan data perjalanan taksi. Mesin distribusi Spark yang dioptimalkan membuatnya ideal untuk memproses data dalam volume besar.

Tip

Untuk himpunan data dengan ukuran yang relatif kecil, gunakan UI Data Wrangler, yang merupakan alat antarmuka pengguna grafis berbasis notebook yang menyediakan eksplorasi interaktif dan pengalaman pembersihan data bagi pengguna yang bekerja dengan dataframe pandas di notebook Microsoft Fabric.

Dalam langkah-langkah berikut, Anda membaca data Taksi NYC mentah dari tabel delta lake lakehouse (disimpan dalam modul 1), dan melakukan berbagai operasi untuk membersihkan dan mengubah data tersebut untuk menyiapkannya untuk melatih model pembelajaran mesin.

Ikuti di buku catatan

Perintah/skrip python yang digunakan dalam setiap langkah tutorial ini dapat ditemukan di notebook yang menyertainya: 03-perform-data-cleansing-and-preparation-using-apache-spark.ipynb. Pastikan untuk melampirkan lakehouse ke notebook sebelum mengeksekusinya.

Bersihkan dan siapkan

  1. Muat Data taksi kuning NYC dari tabel delta lakehouse nyctaxi_raw menggunakan spark.read perintah .

    nytaxi_df = spark.read.format("delta").load("Tables/nyctaxi_raw")
    
  2. Untuk membantu proses pembersihan data, selanjutnya kita menggunakan fitur ringkasan bawaan Apache Spark yang menghasilkan statistik ringkasan, yang merupakan langkah-langkah numerik yang menjelaskan aspek kolom dalam dataframe. Langkah-langkah ini termasuk hitungan, rata-rata, simpantan baku, min, dan maks. Gunakan perintah berikut untuk melihat statistik ringkasan semua kolom dalam himpunan data taksi .

    display(nytaxi_df.summary())
    

    Catatan

    Menghasilkan statistik ringkasan adalah proses yang mahal secara komputasi dan dapat memakan banyak waktu eksekusi berdasarkan ukuran dataframe. Dalam tutorial ini, langkah ini membutuhkan waktu antara dua dan tiga menit.

    Cuplikan layar daftar statistik ringkasan yang dihasilkan.

  3. Dalam langkah ini, kami membersihkan kerangka data nytaxi_df dan menambahkan lebih banyak kolom yang berasal dari nilai kolom yang ada.

    Berikut ini adalah kumpulan operasi yang dilakukan dalam langkah ini:

    1. Tambahkan Kolom turunan

      • pickupDate - mengonversi tanggalwaktu ke tanggal untuk visualisasi dan pelaporan
      • weekDay - jumlah hari dalam seminggu
      • weekDayName - nama hari disingkat
      • dayofMonth - jumlah hari dalam sebulan
      • pickupHour - jam waktu penjemputan
      • tripDuration - mewakili durasi dalam menit perjalanan
      • timeBins - Waktu terikat dalam sehari
    2. Kondisi Filter

      • fareAmount adalah antara dan 100.
      • tripDistance lebih besar dari 0.
      • tripDuration kurang dari 3 jam (180 menit).
      • passengerCount adalah antara 1 dan 8.
      • startLat, startLon, endLat, endLon bukan NULL.
      • Hapus perjalanan outstation (outlier) tripDistance>100.
    from pyspark.sql.functions import col,when, dayofweek, date_format, hour,unix_timestamp, round, dayofmonth, lit
    nytaxidf_prep = nytaxi_df.withColumn('pickupDate', col('tpepPickupDateTime').cast('date'))\
                               .withColumn("weekDay", dayofweek(col("tpepPickupDateTime")))\
                               .withColumn("weekDayName", date_format(col("tpepPickupDateTime"), "EEEE"))\
                               .withColumn("dayofMonth", dayofweek(col("tpepPickupDateTime")))\
                               .withColumn("pickupHour", hour(col("tpepPickupDateTime")))\
                               .withColumn("tripDuration", (unix_timestamp(col("tpepDropoffDateTime")) - unix_timestamp(col("tpepPickupDateTime")))/60)\
                               .withColumn("timeBins", when((col("pickupHour") >=7) & (col("pickupHour")<=10) ,"MorningRush")\
                               .when((col("pickupHour") >=11) & (col("pickupHour")<=15) ,"Afternoon")\
                               .when((col("pickupHour") >=16) & (col("pickupHour")<=19) ,"EveningRush")\
                               .when((col("pickupHour") <=6) | (col("pickupHour")>=20) ,"Night"))\
                               .filter("""fareAmount > 0 AND fareAmount < 100 and tripDistance > 0 AND tripDistance < 100 
                                        AND tripDuration > 0 AND tripDuration <= 189 
                                        AND passengerCount > 0 AND passengerCount <= 8
                                        AND startLat IS NOT NULL AND startLon IS NOT NULL AND endLat IS NOT NULL AND endLon IS NOT NULL""")
    

    Catatan

    Apache Spark menggunakan paradigma evaluasi Malas yang menunda eksekusi transformasi hingga tindakan dipicu. Ini memungkinkan Spark untuk mengoptimalkan rencana eksekusi dan menghindari komputasi yang tidak perlu. Dalam langkah ini, definisi transformasi dan filter dibuat. Pembersihan dan transformasi aktual akan dipicu setelah data ditulis (tindakan) di langkah berikutnya.

  4. Setelah kami mendefinisikan langkah-langkah pembersihan dan menetapkannya ke kerangka data bernama nytaxidf_prep, kami menulis data yang dibersihkan dan disiapkan ke tabel delta baru (nyctaxi_prep) di lakehouse yang terpasang, menggunakan serangkaian perintah berikut.

    table_name = "nyctaxi_prep"
    nytaxidf_prep.write.mode("overwrite").format("delta").save(f"Tables/{table_name}")
    print(f"Spark dataframe saved to delta table: {table_name}")
    

Data yang dibersihkan dan disiapkan yang dihasilkan dalam modul ini sekarang tersedia di lakehouse sebagai tabel delta dan dapat digunakan untuk pemrosesan lebih lanjut dan menghasilkan wawasan.

Langkah berikutnya