Menggunakan Spark untuk bekerja dengan file data

Selesai

Salah satu manfaat menggunakan Spark adalah Anda dapat menulis dan menjalankan kode dalam berbagai bahasa pemrograman, memungkinkan Anda menggunakan keterampilan pemrograman yang sudah Anda miliki dan menggunakan bahasa yang paling sesuai untuk tugas tertentu. Bahasa default di notebook Azure Databricks Spark baru adalah PySpark - versi Python yang dioptimalkan untuk Spark, yang biasanya digunakan oleh ilmuwan dan analis data karena dukungannya yang kuat untuk manipulasi dan visualisasi data. Selain itu, Anda dapat menggunakan bahasa seperti Scala (bahasa turunan Java yang dapat digunakan secara interaktif) dan SQL (varian dari bahasa SQL yang umum digunakan yang disertakan dalam Pustaka Spark SQL untuk bekerja dengan struktur data relasional). Teknisi perangkat lunak juga dapat membuat solusi terkompilasi yang berjalan di Spark menggunakan kerangka kerja seperti Java.

Menjelajahi data dengan dataframe

Secara default, Spark menggunakan struktur data yang disebut himpunan data terdistribusi andal (RDD); tetapi sementara Anda dapat menulis kode yang bekerja secara langsung dengan RDD, struktur data yang paling umum digunakan untuk bekerja dengan data terstruktur di Spark adalah dataframe, yang disediakan sebagai bagian dari pustaka Spark SQL. Dataframe di Spark mirip dengan yang ada di pustaka Python Pandas yang ada di mana-mana, tetapi dioptimalkan untuk bekerja di lingkungan pemrosesan terdistribusi Spark.

Catatan

Selain API Dataframe, Spark SQL menyediakan API Himpunan Data yang diketik dengan kuat yang didukung di Java dan Scala. Kami akan fokus pada Dataframe API dalam modul ini.

Memuat data ke dalam dataframe

Mari kita jelajahi contoh hipotetis untuk melihat bagaimana Anda dapat menggunakan dataframe untuk bekerja dengan data. Misalkan Anda memiliki data berikut dalam file teks yang dipisahkan koma bernama products.csv di folder data di penyimpanan Sistem File Databricks (DBFS):

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Di notebook Spark, Anda dapat menggunakan kode PySpark berikut untuk memuat data ke dalam dataframe dan menampilkan 10 baris pertama:

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Baris %pyspark di awal disebut magic, dan memberi tahu Spark bahwa bahasa yang digunakan dalam sel ini adalah PySpark. Berikut kode Scala yang setara untuk contoh data produk:

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

Magic %spark digunakan untuk menentukan Scala.

Tip

Anda juga dapat memilih bahasa yang ingin Anda gunakan untuk setiap sel di antarmuka Notebook.

Kedua contoh yang ditunjukkan sebelumnya akan menghasilkan output seperti ini:

ProductID ProductName Kategori ListPrice
771 Mountain-100 Perak, 38 Sepeda Gunung 3399,9900
772 Mountain-100 Perak, 42 Sepeda Gunung 3399,9900
773 Mountain-100 Perak, 44 Sepeda Gunung 3399,9900
... ... ... ...

Menentukan skema dataframe

Pada contoh sebelumnya, baris pertama file CSV berisi nama kolom, dan Spark dapat menyimpulkan tipe data setiap kolom dari data yang ada di dalamnya. Anda juga dapat menentukan skema eksplisit untuk data, yang berguna saat nama kolom tidak disertakan dalam file data, seperti contoh CSV ini:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Contoh PySpark berikut menunjukkan cara menentukan skema dataframe yang akan dimuat dari file bernama product-data.csv dalam format ini:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Hasilnya sekali lagi akan mirip dengan:

ProductID ProductName Kategori ListPrice
771 Mountain-100 Perak, 38 Sepeda Gunung 3399,9900
772 Mountain-100 Perak, 42 Sepeda Gunung 3399,9900
773 Mountain-100 Perak, 44 Sepeda Gunung 3399,9900
... ... ... ...

Memfilter dan mengelompokkan dataframe

Anda dapat menggunakan metode kelas Dataframe untuk memfilter, mengurutkan, mengelompokkan, dan memanipulasi data yang ada di dalamnya. Misalnya, contoh kode berikut menggunakan metode pilih untuk mengambil kolom ProductName dan ListPrice dari dataframe df yang berisi data produk di contoh sebelumnya:

pricelist_df = df.select("ProductID", "ListPrice")

Hasil dari contoh kode ini akan terlihat seperti ini:

ProductID ListPrice
771 3399,9900
772 3399,9900
773 3399,9900
... ...

Sama dengan kebanyakan metode manipulasi data, pilih mengembalikan objek dataframe baru.

Tip

Memilih subset kolom dari dataframe adalah operasi yang umum, yang juga dapat dicapai dengan menggunakan sintaks yang lebih pendek berikut:

pricelist_df = df["ProductID", "ListPrice"]

Anda dapat "menautkan" metode bersama-sama untuk melakukan serangkaian manipulasi yang menghasilkan dataframe yang diubah. Misalnya, contoh kode ini menghubungkan metode pilih dan di mana untuk membuat dataframe baru yang berisi kolom ProductName dan ListPrice untuk produk dengan kategori Sepeda Gunung atau Sepeda Jalan:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Hasil dari contoh kode ini akan terlihat seperti ini:

ProductName ListPrice
Mountain-100 Perak, 38 3399,9900
Road-750 Black, 52 539.9900
... ...

Untuk mengelompokkan dan menggabungkan data, Anda dapat menggunakan metode groupBy dan fungsi agregat. Misalnya, kode PySpark berikut menghitung jumlah produk untuk setiap kategori:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Hasil dari contoh kode ini akan terlihat seperti ini:

Kategori jumlah
Headset 3
Roda 14
Sepeda Gunung 32
... ...

Menggunakan ekspresi SQL di Spark

Dataframe API adalah bagian dari pustaka Spark bernama Spark SQL, yang memungkinkan analis data menggunakan ekspresi SQL untuk membuat kueri dan memanipulasi data.

Membuat objek database di katalog Spark

Katalog Spark adalah metastore untuk objek data relasional seperti tampilan dan tabel. Runtime Spark dapat menggunakan katalog untuk mengintegrasikan kode yang ditulis dalam bahasa yang didukung Spark dengan ekspresi SQL yang mungkin lebih alami bagi beberapa analis atau pengembang data.

Salah satu cara paling sederhana untuk membuat data dalam dataframe tersedia untuk kueri di katalog Spark adalah dengan membuat tampilan sementara, seperti yang ditunjukkan dalam contoh kode berikut:

df.createOrReplaceTempView("products")

Tampilan bersifat sementara, yang berarti bahwa tampilan dihapus secara otomatis di akhir sesi saat ini. Anda juga dapat membuat tabel yang bertahan di katalog untuk menentukan database yang dapat dikueri menggunakan Spark SQL.

Catatan

Kami tidak akan menjelajahi tabel katalog Spark secara mendalam dalam modul ini, tetapi ada baiknya meluangkan waktu untuk menyoroti beberapa poin penting:

  • Anda dapat membuat tabel kosong dengan menggunakan metode spark.catalog.createTable. Tabel adalah struktur metadata yang menyimpan data dasarnya di lokasi penyimpanan yang terkait dengan katalog. Menghapus tabel juga akan menghapus data yang mendasarinya.
  • Anda dapat menyimpan dataframe sebagai tabel dengan menggunakan metode saveAsTable.
  • Anda dapat membuat tabel eksternal dengan menggunakan metode spark.catalog.createExternalTable. Tabel eksternal menentukan metadata dalam katalog tetapi mendapatkan data dasarnya dari lokasi penyimpanan eksternal; biasanya folder di data lake. Menghapus tabel eksternal tidak menghapus data yang mendasarinya.

Menggunakan API SQL Spark untuk mengkueri data

Anda dapat menggunakan API SQL Spark dalam kode yang ditulis dalam bahasa apa pun untuk mengkueri data dalam katalog. Misalnya, kode PySpark berikut menggunakan kueri SQL untuk mengembalikan data dari tampilan produk sebagai dataframe.

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

Hasil dari contoh kode akan terlihat seperti tabel berikut:

ProductName ListPrice
Mountain-100 Perak, 38 3399,9900
Road-750 Black, 52 539.9900
... ...

Menggunakan kode SQL

Contoh sebelumnya menunjukkan cara menggunakan API SQL Spark untuk menyematkan ekspresi SQL dalam kode Spark. Di notebook, Anda juga dapat menggunakan magic %sql untuk menjalankan kode SQL yang mengkueri objek dalam katalog, seperti ini:

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

Contoh kode SQL mengembalikan hasil yang secara otomatis ditampilkan di notebook sebagai tabel, seperti di bawah ini:

Kategori ProductCount
Bib-Shorts 3
Rak Sepeda 1
Stand Sepeda 1
... ...