Menggunakan Spark untuk bekerja dengan file data
Salah satu manfaat menggunakan Spark adalah Anda dapat menulis dan menjalankan kode dalam berbagai bahasa pemrograman, memungkinkan Anda menggunakan keterampilan pemrograman yang sudah Anda miliki dan menggunakan bahasa yang paling sesuai untuk tugas tertentu. Bahasa default di notebook Azure Databricks Spark baru adalah PySpark - versi Python yang dioptimalkan untuk Spark, yang biasanya digunakan oleh ilmuwan dan analis data karena dukungannya yang kuat untuk manipulasi dan visualisasi data. Selain itu, Anda dapat menggunakan bahasa seperti Scala (bahasa turunan Java yang dapat digunakan secara interaktif) dan SQL (varian dari bahasa SQL yang umum digunakan yang disertakan dalam Pustaka Spark SQL untuk bekerja dengan struktur data relasional). Teknisi perangkat lunak juga dapat membuat solusi terkompilasi yang berjalan di Spark menggunakan kerangka kerja seperti Java.
Menjelajahi data dengan dataframe
Secara default, Spark menggunakan struktur data yang disebut himpunan data terdistribusi andal (RDD); tetapi sementara Anda dapat menulis kode yang bekerja secara langsung dengan RDD, struktur data yang paling umum digunakan untuk bekerja dengan data terstruktur di Spark adalah dataframe, yang disediakan sebagai bagian dari pustaka Spark SQL. Dataframe di Spark mirip dengan yang ada di pustaka Python Pandas yang ada di mana-mana, tetapi dioptimalkan untuk bekerja di lingkungan pemrosesan terdistribusi Spark.
Catatan
Selain API Dataframe, Spark SQL menyediakan API Himpunan Data yang diketik dengan kuat yang didukung di Java dan Scala. Kami akan fokus pada Dataframe API dalam modul ini.
Memuat data ke dalam dataframe
Mari kita jelajahi contoh hipotetis untuk melihat bagaimana Anda dapat menggunakan dataframe untuk bekerja dengan data. Misalkan Anda memiliki data berikut dalam file teks yang dipisahkan koma bernama products.csv di folder data di penyimpanan Sistem File Databricks (DBFS):
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Di notebook Spark, Anda dapat menggunakan kode PySpark berikut untuk memuat data ke dalam dataframe dan menampilkan 10 baris pertama:
%pyspark
df = spark.read.load('/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
Baris %pyspark
di awal disebut magic, dan memberi tahu Spark bahwa bahasa yang digunakan dalam sel ini adalah PySpark. Berikut kode Scala yang setara untuk contoh data produk:
%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))
Magic %spark
digunakan untuk menentukan Scala.
Tip
Anda juga dapat memilih bahasa yang ingin Anda gunakan untuk setiap sel di antarmuka Notebook.
Kedua contoh yang ditunjukkan sebelumnya akan menghasilkan output seperti ini:
ProductID | ProductName | Kategori | ListPrice |
---|---|---|---|
771 | Mountain-100 Perak, 38 | Sepeda Gunung | 3399,9900 |
772 | Mountain-100 Perak, 42 | Sepeda Gunung | 3399,9900 |
773 | Mountain-100 Perak, 44 | Sepeda Gunung | 3399,9900 |
... | ... | ... | ... |
Menentukan skema dataframe
Pada contoh sebelumnya, baris pertama file CSV berisi nama kolom, dan Spark dapat menyimpulkan tipe data setiap kolom dari data yang ada di dalamnya. Anda juga dapat menentukan skema eksplisit untuk data, yang berguna saat nama kolom tidak disertakan dalam file data, seperti contoh CSV ini:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Contoh PySpark berikut menunjukkan cara menentukan skema dataframe yang akan dimuat dari file bernama product-data.csv dalam format ini:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Hasilnya sekali lagi akan mirip dengan:
ProductID | ProductName | Kategori | ListPrice |
---|---|---|---|
771 | Mountain-100 Perak, 38 | Sepeda Gunung | 3399,9900 |
772 | Mountain-100 Perak, 42 | Sepeda Gunung | 3399,9900 |
773 | Mountain-100 Perak, 44 | Sepeda Gunung | 3399,9900 |
... | ... | ... | ... |
Memfilter dan mengelompokkan dataframe
Anda dapat menggunakan metode kelas Dataframe untuk memfilter, mengurutkan, mengelompokkan, dan memanipulasi data yang ada di dalamnya. Misalnya, contoh kode berikut menggunakan metode pilih untuk mengambil kolom ProductName dan ListPrice dari dataframe df yang berisi data produk di contoh sebelumnya:
pricelist_df = df.select("ProductID", "ListPrice")
Hasil dari contoh kode ini akan terlihat seperti ini:
ProductID | ListPrice |
---|---|
771 | 3399,9900 |
772 | 3399,9900 |
773 | 3399,9900 |
... | ... |
Sama dengan kebanyakan metode manipulasi data, pilih mengembalikan objek dataframe baru.
Tip
Memilih subset kolom dari dataframe adalah operasi yang umum, yang juga dapat dicapai dengan menggunakan sintaks yang lebih pendek berikut:
pricelist_df = df["ProductID", "ListPrice"]
Anda dapat "menautkan" metode bersama-sama untuk melakukan serangkaian manipulasi yang menghasilkan dataframe yang diubah. Misalnya, contoh kode ini menghubungkan metode pilih dan di mana untuk membuat dataframe baru yang berisi kolom ProductName dan ListPrice untuk produk dengan kategori Sepeda Gunung atau Sepeda Jalan:
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Hasil dari contoh kode ini akan terlihat seperti ini:
ProductName | ListPrice |
---|---|
Mountain-100 Perak, 38 | 3399,9900 |
Road-750 Black, 52 | 539.9900 |
... | ... |
Untuk mengelompokkan dan menggabungkan data, Anda dapat menggunakan metode groupBy dan fungsi agregat. Misalnya, kode PySpark berikut menghitung jumlah produk untuk setiap kategori:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Hasil dari contoh kode ini akan terlihat seperti ini:
Kategori | jumlah |
---|---|
Headset | 3 |
Roda | 14 |
Sepeda Gunung | 32 |
... | ... |
Menggunakan ekspresi SQL di Spark
Dataframe API adalah bagian dari pustaka Spark bernama Spark SQL, yang memungkinkan analis data menggunakan ekspresi SQL untuk membuat kueri dan memanipulasi data.
Membuat objek database di katalog Spark
Katalog Spark adalah metastore untuk objek data relasional seperti tampilan dan tabel. Runtime Spark dapat menggunakan katalog untuk mengintegrasikan kode yang ditulis dalam bahasa yang didukung Spark dengan ekspresi SQL yang mungkin lebih alami bagi beberapa analis atau pengembang data.
Salah satu cara paling sederhana untuk membuat data dalam dataframe tersedia untuk kueri di katalog Spark adalah dengan membuat tampilan sementara, seperti yang ditunjukkan dalam contoh kode berikut:
df.createOrReplaceTempView("products")
Tampilan bersifat sementara, yang berarti bahwa tampilan dihapus secara otomatis di akhir sesi saat ini. Anda juga dapat membuat tabel yang bertahan di katalog untuk menentukan database yang dapat dikueri menggunakan Spark SQL.
Catatan
Kami tidak akan menjelajahi tabel katalog Spark secara mendalam dalam modul ini, tetapi ada baiknya meluangkan waktu untuk menyoroti beberapa poin penting:
- Anda dapat membuat tabel kosong dengan menggunakan metode
spark.catalog.createTable
. Tabel adalah struktur metadata yang menyimpan data dasarnya di lokasi penyimpanan yang terkait dengan katalog. Menghapus tabel juga akan menghapus data yang mendasarinya. - Anda dapat menyimpan dataframe sebagai tabel dengan menggunakan metode
saveAsTable
. - Anda dapat membuat tabel eksternal dengan menggunakan metode
spark.catalog.createExternalTable
. Tabel eksternal menentukan metadata dalam katalog tetapi mendapatkan data dasarnya dari lokasi penyimpanan eksternal; biasanya folder di data lake. Menghapus tabel eksternal tidak menghapus data yang mendasarinya.
Menggunakan API SQL Spark untuk mengkueri data
Anda dapat menggunakan API SQL Spark dalam kode yang ditulis dalam bahasa apa pun untuk mengkueri data dalam katalog. Misalnya, kode PySpark berikut menggunakan kueri SQL untuk mengembalikan data dari tampilan produk sebagai dataframe.
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
Hasil dari contoh kode akan terlihat seperti tabel berikut:
ProductName | ListPrice |
---|---|
Mountain-100 Perak, 38 | 3399,9900 |
Road-750 Black, 52 | 539.9900 |
... | ... |
Menggunakan kode SQL
Contoh sebelumnya menunjukkan cara menggunakan API SQL Spark untuk menyematkan ekspresi SQL dalam kode Spark. Di notebook, Anda juga dapat menggunakan magic %sql
untuk menjalankan kode SQL yang mengkueri objek dalam katalog, seperti ini:
%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
Contoh kode SQL mengembalikan hasil yang secara otomatis ditampilkan di notebook sebagai tabel, seperti di bawah ini:
Kategori | ProductCount |
---|---|
Bib-Shorts | 3 |
Rak Sepeda | 1 |
Stand Sepeda | 1 |
... | ... |