Dasar-dasar PySpark

Artikel ini menjelaskan contoh sederhana untuk mengilustrasikan penggunaan PySpark. Ini mengasumsikan Anda memahami konsep Dasar Apache Spark dan menjalankan perintah dalam buku catatan Azure Databricks yang terhubung ke komputasi. Anda membuat DataFrames menggunakan data sampel, melakukan transformasi dasar termasuk operasi baris dan kolom pada data ini, menggabungkan beberapa DataFrame dan menggabungkan data ini, memvisualisasikan data ini, lalu menyimpannya ke tabel atau file.

Mengunggah data

Beberapa contoh dalam artikel ini menggunakan data sampel yang disediakan Databricks untuk menunjukkan penggunaan DataFrame untuk memuat, mengubah, dan menyimpan data. Jika Anda ingin menggunakan data Anda sendiri yang belum ada di Databricks, Anda dapat mengunggahnya terlebih dahulu dan membuat DataFrame dari data tersebut. Lihat Membuat atau mengubah tabel menggunakan unggahan file dan Bekerja dengan file dalam volume Katalog Unity.

Tentang data sampel Databricks

Databricks menyediakan data sampel di samples katalog dan di /databricks-datasets direktori.

  • Untuk mengakses data sampel dalam samples katalog, gunakan format samples.<schema-name>.<table-name>. Artikel ini menggunakan tabel dalam samples.tpch skema, yang berisi data dari bisnis fiktif. Tabel berisi customer informasi tentang pelanggan, dan orders berisi informasi tentang pesanan yang dilakukan oleh pelanggan tersebut.
  • Gunakan dbutils.fs.ls untuk menjelajahi data di /databricks-datasets. Gunakan Spark SQL atau DataFrames untuk mengkueri data di lokasi ini menggunakan jalur file. Untuk mempelajari selengkapnya tentang data sampel yang disediakan Databricks, lihat Sampel himpunan data.

Mengimpor jenis data

Banyak operasi PySpark mengharuskan Anda menggunakan fungsi SQL atau berinteraksi dengan jenis Spark asli. Baik secara langsung hanya mengimpor fungsi dan jenis yang Anda butuhkan, atau untuk menghindari penggantian fungsi bawaan Python, impor modul ini menggunakan alias umum.

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

# import modules using an alias
import pyspark.sql.types as T
import pyspark.sql.functions as F

Untuk daftar komprehensif jenis data, lihat Jenis Data PySpark.

Untuk daftar komprehensif fungsi PySpark SQL, lihat Fungsi PySpark.

Membuat DataFrame

Ada beberapa cara untuk membuat DataFrame. Biasanya Anda menentukan DataFrame terhadap sumber data seperti tabel atau kumpulan file. Kemudian seperti yang dijelaskan di bagian konsep dasar Apache Spark, gunakan tindakan, seperti display, untuk memicu transformasi yang akan dijalankan. Metode display menghasilkan "DataFrames".

Membuat DataFrame dengan nilai yang ditentukan

Untuk membuat DataFrame dengan nilai yang ditentukan, gunakan metode createDataFrame, di mana baris dinyatakan sebagai daftar tuples:

df_children = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = ['name', 'age'])
display(df_children)

Perhatikan dalam output bahwa jenis data kolom df_children secara otomatis disimpulkan. Anda dapat menentukan jenis secara alternatif dengan menambahkan skema. Skema didefinisikan menggunakan StructType yang terdiri dari StructFields yang menentukan nama, jenis data, dan bendera boolean yang menunjukkan apakah berisi nilai null atau tidak. Anda harus mengimpor jenis data dari pyspark.sql.types.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
  ])
)
display(df_children_with_schema)

Membuat DataFrame dari tabel di Unity Catalog

Untuk membuat DataFrame dari tabel di Unity Catalog, gunakan table metode mengidentifikasi tabel menggunakan format <catalog-name>.<schema-name>.<table-name>. Klik Katalog di bilah navigasi kiri untuk menggunakan Catalog Explorer untuk menavigasi ke tabel Anda. Klik itu, lalu pilih Salin jalur tabel untuk menyisipkan jalur tabel ke dalam buku catatan.

Contoh berikut memuat tabel samples.tpch.customer, tetapi Anda dapat menyediakan jalur ke tabel Anda sendiri.

df_customer = spark.table('samples.tpch.customer')
display(df_customer)

Membuat DataFrame dari file yang diunggah

Untuk membuat DataFrame dari file yang Anda unggah ke volume Unity Catalog, gunakan read properti . Metode ini mengembalikan DataFrameReader, yang kemudian dapat Anda gunakan untuk membaca format yang sesuai. Klik opsi katalog di bilah sisi kecil di sebelah kiri dan gunakan browser katalog untuk menemukan file Anda. Pilih, lalu klik Salin jalur file volume.

Contoh di bawah ini membaca dari *.csv file, tetapi DataFrameReader mendukung pengunggahan file dalam banyak format lainnya. Lihat Metode DataFrameReader.

# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(volume_file_path)
)
display(df_csv)

Untuk informasi selengkapnya tentang volume Katalog Unity, lihat Apa itu volume Katalog Unity?.

Membuat DataFrame dari respons JSON

Untuk membuat DataFrame dari payload respons JSON yang dikembalikan oleh REST API, gunakan paket Python untuk mengkueri requests dan mengurai respons. Anda harus mengimpor paket untuk menggunakannya. Contoh ini menggunakan data dari basis data aplikasi obat Badan Pengawas Obat dan Makanan Amerika Serikat.

import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

Untuk informasi tentang bekerja dengan JSON dan data semi-terstruktur lainnya di Databricks, lihat Model data semi-terstruktur.

Pilih bidang atau objek JSON

Untuk memilih bidang atau objek tertentu dari JSON yang dikonversi, gunakan [] notasi. Misalnya, untuk memilih products kolom yang itu sendiri merupakan array produk.

display(df_drugs.select(df_drugs["products"]))

Anda juga dapat menautkan panggilan metode untuk melintasi beberapa bidang. Misalnya, untuk menghasilkan nama merek produk pertama dalam aplikasi obat:

display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

Membuat DataFrame dari file

Untuk menunjukkan pembuatan DataFrame dari file, contoh ini memuat data CSV di /databricks-datasets direktori.

Untuk menavigasi ke himpunan data sampel, Anda dapat menggunakan perintah sistem file Databricks Utilties . Contoh berikut menggunakan dbutils untuk mencantumkan himpunan data yang tersedia di /databricks-datasets:

display(dbutils.fs.ls('/databricks-datasets'))

Atau, Anda dapat menggunakan %fs untuk mengakses perintah sistem file Databricks CLI, seperti yang ditunjukkan dalam contoh berikut:

%fs ls '/databricks-datasets'

Untuk membuat DataFrame dari file atau direktori file, tentukan jalur dalam load metode :

df_population = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

Mengubah data dengan DataFrames

DataFrame memudahkan transformasi data menggunakan metode bawaan untuk mengurutkan, memfilter, dan mengagregasi data. Banyak transformasi tidak ditentukan sebagai metode pada DataFrames, tetapi sebaliknya disediakan dalam pyspark.sql.functions paket. Lihat Fungsi Databricks PySpark SQL.

Operasi kolom

Spark menyediakan banyak operasi kolom dasar:

Tip

Untuk menghasilkan semua kolom dalam DataFrame, gunakan columns, misalnya df_customer.columns.

Pilih kolom

Anda dapat memilih kolom tertentu menggunakan select dan col. Fungsi col ini ada di pyspark.sql.functions submodul.

from pyspark.sql.functions import col

df_customer.select(
  col("c_custkey"),
  col("c_acctbal")
)

Anda juga dapat merujuk ke kolom dengan menggunakan expr, yang merupakan ekspresi yang didefinisikan sebagai string.

from pyspark.sql.functions import expr

df_customer.select(
  expr("c_custkey"),
  expr("c_acctbal")
)

Anda juga dapat menggunakan selectExpr, yang menerima ekspresi SQL:

df_customer.selectExpr(
  "c_custkey as key",
  "round(c_acctbal) as account_rounded"
)

Untuk memilih kolom menggunakan string literal, lakukan hal berikut:

df_customer.select(
  "c_custkey",
  "c_acctbal"
)

Untuk memilih kolom secara eksplisit dari DataFrame tertentu, Anda dapat menggunakan [] operator atau . operator. (Operator . tidak dapat digunakan untuk memilih kolom yang dimulai dengan bilangan bulat, atau yang berisi spasi atau karakter khusus.) Ini bisa sangat membantu ketika Anda bergabung dengan DataFrames di mana beberapa kolom memiliki nama yang sama.

df_customer.select(
  df_customer["c_custkey"],
  df_customer["c_acctbal"]
)
df_customer.select(
  df_customer.c_custkey,
  df_customer.c_acctbal
)

Membuat kolom

Untuk membuat kolom baru, gunakan metode .withColumn Contoh berikut membuat kolom baru yang berisi nilai boolean berdasarkan apakah saldo c_acctbal akun pelanggan melebihi 1000:

df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

Ganti nama kolom

Untuk mengganti nama kolom, gunakan withColumnRenamed metode , yang menerima nama kolom yang sudah ada dan baru:

df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

Metode alias ini sangat membantu ketika Anda ingin mengganti nama kolom Anda sebagai bagian dari agregasi:

from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

Jenis kolom yang di-cast

Dalam beberapa kasus, Anda mungkin ingin mengubah tipe data untuk satu atau beberapa kolom di DataFrame Anda. Untuk melakukan ini, gunakan cast metode untuk mengonversi antara jenis data kolom. Contoh berikut menunjukkan cara mengonversi kolom dari bilangan bulat ke jenis string, menggunakan col metode untuk mereferensikan kolom:

from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

Hapus kolom

Untuk menghapus kolom, Anda bisa menghilangkan kolom selama memilih atau select(*) except atau Anda dapat menggunakan metode drop:

df_customer_flag_renamed.drop("balance_flag_renamed")

Anda juga dapat menghilangkan beberapa kolom sekaligus:

df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

Operasi baris

Spark menyediakan banyak operasi baris dasar.

Memfilter baris

Untuk memfilter baris, gunakan filter metode atau where pada DataFrame untuk mengembalikan hanya baris tertentu. Untuk mengidentifikasi kolom yang akan difilter, gunakan metode col atau ekspresi yang menjadi kolom.

from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

Untuk memfilter beberapa kondisi, gunakan operator logis. Misalnya, & dan | memungkinkan Anda untuk AND dan OR kondisi, masing-masing. Contoh berikut memfilter baris di mana c_nationkey sama dengan 20 dan c_acctbal lebih besar dari 1000.

df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

Menghapus baris duplikat

Untuk menghilangkan duplikasi pada baris, gunakan distinct, yang hanya menghasilkan baris unik.

df_unique = df_customer.distinct()

Menangani nilai null

Untuk menangani nilai null, hapus baris yang berisi nilai null menggunakan metode na.drop. Metode ini memungkinkan Anda menentukan apakah Anda ingin menghilangkan baris yang berisi any nilai null atau all nilai null.

Untuk menghilangkan nilai null, gunakan salah satu contoh berikut.

df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

Jika Anda hanya ingin memfilter baris yang berisi semua nilai null, gunakan yang berikut ini:

df_customer_no_nulls = df_customer.na.drop("all")

Anda dapat menerapkan ini untuk subset kolom dengan menentukan ini, seperti yang ditunjukkan di bawah ini:

df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

Untuk mengisi nilai yang hilang, gunakan metode fill. Anda dapat memilih untuk menerapkan ini ke semua kolom atau subset kolom. Dalam contoh di bawah ini, saldo akun yang memiliki nilai null untuk saldo akun c_acctbal diisi dengan 0.

df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

Untuk mengganti string dengan nilai lain, gunakan replace metode . Dalam contoh di bawah ini, string alamat kosong diganti dengan kata UNKNOWN:

df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

Tambahkan baris

Untuk menambahkan baris, Anda perlu menggunakan union metode untuk membuat DataFrame baru. Dalam contoh berikut, DataFrame df_that_one_customer yang dibuat sebelumnya dan df_filtered_customer digabungkan, yang mengembalikan DataFrame dengan tiga pelanggan:

df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)

Note

Anda juga dapat menggabungkan DataFrames dengan menulisnya ke tabel lalu menambahkan baris baru. Untuk beban kerja produksi, pemrosesan bertahap sumber data ke tabel target dapat secara drastis mengurangi latensi dan biaya komputasi saat ukuran data tumbuh. Lihat Konektor standar di Lakeflow Connect.

Urutkan baris

Important

Pengurutan bisa mahal dalam skala besar, dan jika Anda menyimpan data yang diurutkan dan memuat ulang data dengan Spark, urutan tidak dijamin. Pastikan Anda sengaja dalam penggunaan pengurutan.

Untuk mengurutkan baris menurut satu atau beberapa kolom, gunakan sort metode atau orderBy . Secara default metode ini mengurutkan dalam urutan naik:

df_customer.orderBy(col("c_acctbal"))

Untuk memfilter dalam urutan turun, gunakan desc:

df_customer.sort(col("c_custkey").desc())

Contoh berikut menunjukkan cara mengurutkan pada dua kolom:

df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

Untuk membatasi jumlah baris yang akan dikembalikan setelah DataFrame diurutkan, gunakan metode .limit Contoh berikut hanya menampilkan hasil teratas 10 :

display(df_sorted.limit(10))

Menggabungkan DataFrame

Untuk menggabungkan dua atau beberapa DataFrame, gunakan join metode . Anda dapat menentukan bagaimana Anda ingin DataFrames digabungkan dalam parameter how (jenis penggabungan) dan on (kolom yang digunakan untuk mendasarkan penggabungan). Jenis gabungan umum meliputi:

  • inner: Ini adalah jenis gabungan default, yang mengembalikan DataFrame yang hanya mempertahankan baris di mana terdapat kecocokan dengan parameter on di seluruh DataFrame.
  • left: Ini menyimpan semua baris DataFrame pertama yang ditentukan dan hanya baris dari DataFrame kedua yang ditentukan yang memiliki kecocokan dengan baris pertama.
  • outer: Gabungan luar menyimpan semua baris dari kedua DataFrames terlepas dari kecocokan.

Untuk informasi terperinci tentang gabungan, lihat Bekerja dengan gabungan di Azure Databricks. Untuk daftar gabungan yang didukung di PySpark, lihat Gabungan DataFrame.

Contoh berikut mengembalikan satu DataFrame di mana setiap baris orders DataFrame digabungkan dengan baris yang sesuai dari customers DataFrame. Gabungan dalam digunakan, karena harapannya adalah bahwa setiap pesanan sesuai dengan persis satu pelanggan.

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
  df_customer,
  on = df_order["o_custkey"] == df_customer["c_custkey"],
  how = "inner"
)

display(df_joined)

Untuk bergabung pada beberapa kondisi, gunakan operator boolean seperti & dan | untuk menentukan AND dan OR, masing-masing. Contoh berikut menambahkan kondisi tambahan, memfilter hanya ke baris yang memiliki o_totalprice lebih besar dari 500,000:

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
  df_customer,
  on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
  how = "inner"
)

display(df_complex_joined)

Data agregat

Untuk menggabungkan data dalam DataFrame, mirip GROUP BY dengan di SQL, gunakan groupBy metode untuk menentukan kolom yang akan dikelompokkan menurut dan agg metode untuk menentukan agregasi. Impor agregasi umum termasuk avg, sum, max, dan min dari pyspark.sql.functions. Contoh berikut menunjukkan rata-rata saldo pelanggan berdasarkan segmen pasar:

from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

Beberapa agregasi adalah tindakan, yang berarti bahwa mereka memicu komputasi. Dalam hal ini Anda tidak perlu menggunakan tindakan lain untuk menghasilkan hasil.

Untuk menghitung baris dalam DataFrame, gunakan count metode :

df_customer.count()

Pemanggilan berantai

Metode yang mentransformasi DataFrames mengembalikan DataFrames, dan Spark tidak bertindak pada transformasi sampai sebuah aksi dipanggil. Evaluasi tunda ini berarti Anda dapat menghubungkan beberapa metode demi kenyamanan dan keterbacaan. Contoh berikut menunjukkan cara menautkan pemfilteran, agregasi, dan pengurutan:

from pyspark.sql.functions import count

df_chained = (
    df_order.filter(col("o_orderstatus") == "F")
    .groupBy(col("o_orderpriority"))
    .agg(count(col("o_orderkey")).alias("n_orders"))
    .sort(col("n_orders").desc())
)

display(df_chained)

Memvisualisasikan DataFrame Anda

Untuk memvisualisasikan DataFrame di buku catatan, klik + tanda di samping tabel di kiri atas DataFrame, lalu pilih Visualisasi untuk menambahkan satu atau beberapa bagan berdasarkan DataFrame Anda. Untuk detail tentang visualisasi, lihat Visualisasi di buku catatan Databricks dan editor SQL.

display(df_order)

Untuk melakukan visualisasi tambahan, Databricks merekomendasikan penggunaan API pandas untuk Spark. memungkinkan Anda mengonversi .pandas_api() ke API pandas yang sesuai untuk Spark DataFrame. Untuk informasi selengkapnya, lihat API Pandas di Spark.

Menyimpan data Anda

Setelah mengubah data, Anda dapat menyimpannya menggunakan DataFrameWriter metode . Daftar lengkap metode ini dapat ditemukan di DataFrameWriter. Bagian berikut ini memperlihatkan cara menyimpan DataFrame Anda sebagai tabel dan sebagai kumpulan file data.

Simpan DataFrame Anda sebagai tabel

Untuk menyimpan DataFrame Anda sebagai tabel di Unity Catalog, gunakan write.saveAsTable metode dan tentukan jalur dalam format <catalog-name>.<schema-name>.<table-name>.

df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

Menulis DataFrame Anda sebagai CSV

Untuk menulis DataFrame Anda ke format *.csv, gunakan metode write.csv, menentukan format dan opsi. Secara default jika data ada di jalur yang ditentukan, operasi tulis gagal. Anda dapat menentukan salah satu mode berikut untuk mengambil tindakan yang berbeda:

  • overwrite menimpa semua data yang ada di jalur target dengan isi DataFrame.
  • append menambahkan konten DataFrame ke data di jalur target.
  • ignore secara diam-diam gagal menulis jika data ada di jalur target.

Contoh berikut menunjukkan penimpaan data dengan konten DataFrame sebagai file CSV:

# Assign this variable your file path
file_path = ""

(df_joined.write
  .format("csv")
  .mode("overwrite")
  .write(file_path)
)

Langkah selanjutnya

Untuk memanfaatkan lebih banyak kemampuan Spark di Databricks, lihat: