Bagikan melalui


Fungsi untuk menentukan himpunan data

Modul pyspark.pipelines (di sini dikenal sebagai dp) mengimplementasikan banyak dari fungsionalitas intinya dengan menggunakan pendekorasi. Dekorator ini menerima fungsi yang menentukan kueri streaming atau batch dan mengembalikan Apache Spark DataFrame. Sintaks berikut menunjukkan contoh sederhana untuk menentukan himpunan data alur:

from pyspark import pipelines as dp

@dp.table()
def function_name(): # This is the function decorated
  return (<query>) # This is the query logic that defines the dataset

Halaman ini menyediakan gambaran umum fungsi dan kueri yang menentukan himpunan data dalam alur. Untuk daftar lengkap dekorator yang tersedia, silakan merujuk ke Referensi Pengembang Pipeline.

Fungsi yang Anda gunakan untuk menentukan himpunan data tidak boleh menyertakan logika Python arbitrer yang tidak terkait dengan himpunan data, termasuk panggilan ke API pihak ketiga. Pipeline menjalankan fungsi ini beberapa kali selama perencanaan, validasi, dan pembaruan. Termasuk logika arbitrer dapat menyebabkan hasil yang tidak terduga.

Membaca data untuk memulai definisi himpunan data

Fungsi yang digunakan untuk menentukan himpunan data alur biasanya dimulai dengan operasi spark.read atau spark.readStream. Operasi baca ini mengembalikan objek DataFrame statis atau streaming yang Anda gunakan untuk menentukan transformasi tambahan sebelum mengembalikan DataFrame. Contoh lain dari operasi spark yang mengembalikan DataFrame termasuk spark.table, atau spark.range.

Fungsi tidak boleh mereferensikan DataFrames yang ditentukan di luar fungsi. Mencoba mereferensikan DataFrames yang ditentukan pada cakupan yang berbeda dapat mengakibatkan perilaku yang tidak terduga. Untuk contoh pola metaprogram untuk membuat beberapa tabel, lihat Membuat tabel dalam perulanganfor.

Contoh berikut menunjukkan sintaks dasar untuk membaca data menggunakan logika batch atau streaming:

from pyspark import pipelines as dp

# Batch read on a table
@dp.materialized_view()
def function_name():
  return spark.read.table("catalog_name.schema_name.table_name")

# Batch read on a path
@dp.materialized_view()
def function_name():
  return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")


# Streaming read on a table
@dp.table()
def function_name():
  return spark.readStream.table("catalog_name.schema_name.table_name")

# Streaming read on a path
@dp.table()
def function_name():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .load("/Volumes/catalog_name/schema_name/volume_name/data_path")
  )

Jika Anda perlu membaca data dari REST API eksternal, terapkan koneksi ini menggunakan sumber data kustom Python. Lihat Sumber data kustom PySpark.

Nota

Dimungkinkan untuk membuat kerangka data Apache Spark apa saja dari kumpulan data Python, termasuk kerangka data pandas, dicts, dan daftar. Pola-pola ini mungkin berguna selama pengembangan dan pengujian, tetapi sebagian besar definisi himpunan data alur produksi harus dimulai dengan memuat data dari file, sistem eksternal, atau tabel atau tampilan yang ada.

Transformasi berantai

Alur mendukung hampir semua transformasi Apache Spark DataFrame. Anda dapat menyertakan sejumlah transformasi dalam fungsi definisi himpunan data Anda, tetapi Anda harus memastikan bahwa metode yang Anda gunakan selalu mengembalikan objek DataFrame.

Jika Anda memiliki transformasi perantara yang mendorong beberapa beban kerja hilir tetapi Anda tidak perlu mewujudkannya sebagai tabel, gunakan @dp.temporary_view() untuk menambahkan tampilan sementara ke alur Anda. Anda kemudian dapat mereferensikan tampilan ini menggunakan spark.read.table("temp_view_name") dalam beberapa definisi himpunan data hilir. Sintaks berikut menunjukkan pola ini:

from pyspark import pipelines as dp

@dp.temporary_view()
def a():
  return spark.read.table("source").filter(...)

@dp.materialized_view()
def b():
  return spark.read.table("a").groupBy(...)

@dp.materialized_view()
def c():
  return spark.read.table("a").groupBy(...)

Ini memastikan bahwa pipeline memiliki pemahaman penuh terhadap transformasi dalam tampilan Anda selama perencanaan pipeline dan mencegah masalah potensial yang terkait dengan kode Python arbitrer yang berjalan di luar definisi kumpulan data.

Dalam fungsi Anda, Anda dapat merantai DataFrames bersama-sama untuk membuat DataFrames baru tanpa menulis hasil bertahap sebagai tampilan, tampilan materialisasi, atau tabel streaming, seperti dalam contoh berikut:

from pyspark import pipelines as dp

@dp.table()
def multiple_transformations():
  df1 = spark.read.table("source").filter(...)
  df2 = df1.groupBy(...)
  return df2.filter(...)

Jika semua DataFrame Anda melakukan pembacaan awal menggunakan logika batch, hasil pengembalian Anda adalah DataFrame statis. Jika Anda memiliki kueri yang streaming, hasil pengembalian Anda adalah DataFrame streaming.

Mengembalikan DataFrame

Gunakan @dp.table untuk membuat tabel streaming dari hasil baca streaming. Gunakan @dp.materialized_view untuk membuat tampilan materialisasi dari hasil pembacaan batch. Sebagian besar dekorator lain bekerja pada DataFrame streaming dan statis, sementara beberapa membutuhkan DataFrame streaming.

Fungsi yang digunakan untuk menentukan himpunan data harus mengembalikan Spark DataFrame. Jangan pernah menggunakan metode yang menyimpan atau menulis ke file atau tabel sebagai bagian dari kode himpunan data alur Anda.

Contoh operasi Apache Spark yang tidak boleh digunakan dalam kode alur:

  • collect()
  • count()
  • toPandas()
  • save()
  • saveAsTable()
  • start()
  • toTable()

Nota

Pipeline juga mendukung penggunaan Pandas di Spark untuk fungsi definisi himpunan data. Lihat API Pandas di Spark.

Menggunakan SQL dalam alur Python

PySpark mendukung spark.sql operator untuk menulis kode DataFrame menggunakan SQL. Saat Anda menggunakan pola ini dalam kode sumber pipeline, pola ini dikompilasi menjadi tampilan materialisasi atau tabel streaming.

Contoh kode berikut setara dengan penggunaan spark.read.table("catalog_name.schema_name.table_name") untuk logika kueri himpunan data:

@dp.materialized_view
def my_table():
  return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")

dlt.read dan dlt.read_stream (warisan)

Modul lama dlt mencakup fungsi dlt.read() dan dlt.read_stream() yang diperkenalkan untuk mendukung fungsionalitas dalam mode penerbitan pipeline legacy. Metode ini didukung, tetapi Databricks merekomendasikan untuk selalu menggunakan spark.read.table() fungsi dan spark.readStream.table() karena hal berikut:

  • Fungsi-fungsi dlt memiliki dukungan terbatas dalam membaca himpunan data yang ditentukan di luar pipa saat ini.
  • Fungsi spark mendukung menentukan opsi, seperti skipChangeCommits, untuk membaca operasi. dlt fungsi tidak mendukung penentuan opsi.
  • dlt Modul ini sendiri telah digantikan oleh pyspark.pipelines modul. Databricks merekomendasikan penggunaan from pyspark import pipelines as dp untuk diimpor pyspark.pipelines untuk digunakan saat menulis kode alur di Python.