Menggunakan Spark untuk bekerja dengan file data

Selesai

Setelah menyiapkan notebook dan melampirkannya ke kluster, Anda bisa menggunakan Spark untuk membaca dan memproses file data. Spark mendukung berbagai format—seperti CSV, JSON, Parquet, ORC, Avro, dan Delta—dan Databricks menyediakan konektor bawaan untuk mengakses file yang disimpan di ruang kerja, di Azure Data Lake atau Blob Storage, atau di sistem eksternal lainnya.

Alur kerja biasanya mengikuti tiga langkah:

  1. Baca file ke dalam Spark DataFrame menggunakan spark.read dengan format dan jalur yang benar. Saat membaca format teks mentah seperti CSV atau JSON, Spark dapat menyimpulkan skema (nama kolom dan jenis data), tetapi ini terkadang lambat atau tidak dapat diandalkan. Praktik yang lebih baik dalam produksi adalah menentukan skema secara eksplisit sehingga data dimuat secara konsisten dan efisien.

  2. Jelajahi dan ubah DataFrame menggunakan operasi SQL atau DataFrame (misalnya, memfilter baris, memilih kolom, menggabungkan nilai).

  3. Tulis kembali hasilnya ke penyimpanan dalam format yang dipilih.

Bekerja dengan file di Spark dirancang agar konsisten di seluruh himpunan data kecil dan besar. Kode yang sama yang digunakan untuk menguji file CSV kecil juga akan bekerja pada himpunan data yang jauh lebih besar, karena Spark mendistribusikan pekerjaan di seluruh kluster. Ini memudahkan peningkatan skala dari eksplorasi cepat ke pemrosesan data yang lebih kompleks.

Memuat data ke dalam dataframe

Mari kita jelajahi contoh hipotetis untuk melihat bagaimana Anda dapat menggunakan dataframe untuk bekerja dengan data. Misalkan Anda memiliki data berikut dalam file teks yang dibatasi koma bernama products.csv di folder data di penyimpanan Databricks File System (DBFS) Anda:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Di notebook Spark, Anda dapat menggunakan kode PySpark berikut untuk memuat data ke dalam dataframe dan menampilkan 10 baris pertama:

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Garis %pyspark di awal disebut sihir, dan memberi tahu Spark bahwa bahasa yang digunakan dalam sel ini adalah PySpark. Berikut kode Scala yang setara untuk contoh data produk:

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

Magic %spark digunakan untuk menentukan Scala.

Petunjuk / Saran

Anda juga dapat memilih bahasa yang ingin Anda gunakan untuk setiap sel di antarmuka Notebook.

Kedua contoh yang ditunjukkan sebelumnya akan menghasilkan output seperti ini:

ProductID ProductName Kategori Daftar Harga
771 Mountain-100 Perak, 38 Sepeda Gunung 3399,9900
772 Mountain-100 Perak, 42 Sepeda Gunung 3399,9900
773 Mountain-100 Perak, 44 Sepeda Gunung 3399,9900
... ... ... ...

Menentukan skema dataframe

Pada contoh sebelumnya, baris pertama file CSV berisi nama kolom, dan Spark dapat menyimpulkan tipe data setiap kolom dari data yang ada di dalamnya. Anda juga dapat menentukan skema eksplisit untuk data, yang berguna saat nama kolom tidak disertakan dalam file data, seperti contoh CSV ini:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Contoh PySpark berikut menunjukkan cara menentukan skema untuk dataframe yang akan dimuat dari file bernama product-data.csv dalam format ini:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Hasilnya sekali lagi akan mirip dengan:

ProductID ProductName Kategori Daftar Harga
771 Mountain-100 Perak, 38 Sepeda Gunung 3399,9900
772 Mountain-100 Perak, 42 Sepeda Gunung 3399,9900
773 Mountain-100 Perak, 44 Sepeda Gunung 3399,9900
... ... ... ...

Memfilter dan mengelompokkan dataframe

Anda dapat menggunakan metode kelas Dataframe untuk memfilter, mengurutkan, mengelompokkan, dan memanipulasi data yang ada di dalamnya. Misalnya, contoh kode berikut menggunakan select metode untuk mengambil kolom ProductName dan ListPrice dari dataframe df yang berisi data produk dalam contoh sebelumnya:

pricelist_df = df.select("ProductID", "ListPrice")

Hasil dari contoh kode ini akan terlihat seperti ini:

ProductID Daftar Harga
771 3399,9900
772 3399,9900
773 3399,9900
... ...

Sama dengan sebagian besar metode manipulasi data, select mengembalikan objek dataframe baru.

Petunjuk / Saran

Memilih subset kolom dari dataframe adalah operasi yang umum, yang juga dapat dicapai dengan menggunakan sintaks yang lebih pendek berikut:

pricelist_df = df["ProductID", "ListPrice"]

Anda dapat "menautkan" metode bersama-sama untuk melakukan serangkaian manipulasi yang menghasilkan dataframe yang diubah. Misalnya, contoh kode ini menggabungkan metode-metode select dan where untuk membuat dataframe baru yang berisi kolom ProductName dan ListPrice untuk produk dengan kategori Mountain Bikes atau Road Bikes:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Hasil dari contoh kode ini akan terlihat seperti ini:

ProductName Daftar Harga
Mountain-100 Perak, 38 3399,9900
Road-750 Hitam, 52 539.9900
... ...

Untuk mengelompokkan dan menggabungkan data, Anda dapat menggunakan groupby metode dan fungsi agregat. Misalnya, kode PySpark berikut menghitung jumlah produk untuk setiap kategori:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Hasil dari contoh kode ini akan terlihat seperti ini:

Kategori jumlah
Headset 3
Roda 14
Sepeda Gunung 32
... ...

Catatan

Spark DataFrames bersifat deklaratif dan tidak dapat diubah. Setiap transformasi (seperti select, filter, atau groupBy) membuat DataFrame baru yang mewakili apa yang Anda inginkan, bukan cara menjalankannya. Ini membuat kode dapat digunakan kembali, dapat dioptimalkan, dan bebas dari efek samping. Tetapi tidak ada transformasi ini yang benar-benar dijalankan sampai Anda memicu tindakan (misalnya, display, collect, write), di mana Spark menjalankan rencana yang dioptimalkan penuh.

Menggunakan ekspresi SQL di Spark

Dataframe API adalah bagian dari pustaka Spark bernama Spark SQL, yang memungkinkan analis data menggunakan ekspresi SQL untuk membuat kueri dan memanipulasi data.

Membuat objek database di katalog Spark

Katalog Spark adalah metastore untuk objek data relasional seperti tampilan dan tabel. Runtime Spark dapat menggunakan katalog untuk mengintegrasikan kode yang ditulis dalam bahasa yang didukung Spark dengan ekspresi SQL yang mungkin lebih alami bagi beberapa analis atau pengembang data.

Salah satu cara paling sederhana untuk membuat data dalam dataframe tersedia untuk kueri di katalog Spark adalah dengan membuat tampilan sementara, seperti yang ditunjukkan dalam contoh kode berikut:

df.createOrReplaceTempView("products")

Tampilan bersifat sementara, yang berarti bahwa tampilan dihapus secara otomatis di akhir sesi saat ini. Anda juga dapat membuat tabel yang bertahan di katalog untuk menentukan database yang dapat dikueri menggunakan Spark SQL.

Catatan

Kami tidak akan menjelajahi tabel katalog Spark secara mendalam dalam modul ini, tetapi ada baiknya meluangkan waktu untuk menyoroti beberapa poin penting:

  • Anda dapat membuat tabel kosong dengan menggunakan metode spark.catalog.createTable. Tabel adalah struktur metadata yang menyimpan data dasarnya di lokasi penyimpanan yang terkait dengan katalog. Menghapus tabel juga akan menghapus data yang mendasarinya.
  • Anda dapat menyimpan dataframe sebagai tabel dengan menggunakan metode saveAsTable.
  • Anda dapat membuat tabel eksternal dengan menggunakan spark.catalog.createExternalTable metode . Tabel eksternal menentukan metadata dalam katalog tetapi mendapatkan data dasarnya dari lokasi penyimpanan eksternal; biasanya folder di data lake. Menghapus tabel eksternal tidak menghapus data yang mendasar.

Menggunakan API SQL Spark untuk mengkueri data

Anda dapat menggunakan API SQL Spark dalam kode yang ditulis dalam bahasa apa pun untuk mengkueri data dalam katalog. Misalnya, kode PySpark berikut menggunakan kueri SQL untuk mengembalikan data dari tampilan produk sebagai dataframe.

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

Hasil dari contoh kode akan terlihat seperti tabel berikut:

ProductName Daftar Harga
Mountain-100 Perak, 38 3399,9900
Road-750 Hitam, 52 539.9900
... ...

Menggunakan kode SQL

Contoh sebelumnya menunjukkan cara menggunakan API SQL Spark untuk menyematkan ekspresi SQL dalam kode Spark. Di notebook, Anda juga dapat menggunakan magic %sql untuk menjalankan kode SQL yang mengkueri objek dalam katalog, seperti ini:

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

Contoh kode SQL mengembalikan tataan hasil yang secara otomatis ditampilkan di buku catatan sebagai tabel, seperti yang di bawah ini:

Kategori JumlahProduk
Celana Pendek Bib 3
Rak Sepeda 1
Stand Sepeda 1
... ...