Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Modul pyspark.pipelines (di sini dikenal sebagai dp) mengimplementasikan banyak dari fungsionalitas intinya dengan menggunakan pendekorasi. Dekorator ini menerima fungsi yang menentukan kueri streaming atau batch dan mengembalikan Apache Spark DataFrame. Sintaks berikut menunjukkan contoh sederhana untuk menentukan himpunan data alur:
from pyspark import pipelines as dp
@dp.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset
Halaman ini menyediakan gambaran umum fungsi dan kueri yang menentukan himpunan data dalam alur. Untuk daftar lengkap dekorator yang tersedia, silakan merujuk ke Referensi Pengembang Pipeline.
Fungsi yang Anda gunakan untuk menentukan himpunan data tidak boleh menyertakan logika Python arbitrer yang tidak terkait dengan himpunan data, termasuk panggilan ke API pihak ketiga. Pipeline menjalankan fungsi ini beberapa kali selama perencanaan, validasi, dan pembaruan. Termasuk logika arbitrer dapat menyebabkan hasil yang tidak terduga.
Membaca data untuk memulai definisi himpunan data
Fungsi yang digunakan untuk menentukan himpunan data alur biasanya dimulai dengan operasi spark.read atau spark.readStream. Operasi baca ini mengembalikan objek DataFrame statis atau streaming yang Anda gunakan untuk menentukan transformasi tambahan sebelum mengembalikan DataFrame. Contoh lain dari operasi spark yang mengembalikan DataFrame termasuk spark.table, atau spark.range.
Fungsi tidak boleh mereferensikan DataFrames yang ditentukan di luar fungsi. Mencoba mereferensikan DataFrames yang ditentukan pada cakupan yang berbeda dapat mengakibatkan perilaku yang tidak terduga. Untuk contoh pola metaprogram untuk membuat beberapa tabel, lihat Membuat tabel dalam perulanganfor.
Contoh berikut menunjukkan sintaks dasar untuk membaca data menggunakan logika batch atau streaming:
from pyspark import pipelines as dp
# Batch read on a table
@dp.materialized_view()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")
# Batch read on a path
@dp.materialized_view()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")
# Streaming read on a table
@dp.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")
# Streaming read on a path
@dp.table()
def function_name():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)
Jika Anda perlu membaca data dari REST API eksternal, terapkan koneksi ini menggunakan sumber data kustom Python. Lihat Sumber data kustom PySpark.
Nota
Dimungkinkan untuk membuat kerangka data Apache Spark apa saja dari kumpulan data Python, termasuk kerangka data pandas, dicts, dan daftar. Pola-pola ini mungkin berguna selama pengembangan dan pengujian, tetapi sebagian besar definisi himpunan data alur produksi harus dimulai dengan memuat data dari file, sistem eksternal, atau tabel atau tampilan yang ada.
Transformasi berantai
Alur mendukung hampir semua transformasi Apache Spark DataFrame. Anda dapat menyertakan sejumlah transformasi dalam fungsi definisi himpunan data Anda, tetapi Anda harus memastikan bahwa metode yang Anda gunakan selalu mengembalikan objek DataFrame.
Jika Anda memiliki transformasi perantara yang mendorong beberapa beban kerja hilir tetapi Anda tidak perlu mewujudkannya sebagai tabel, gunakan @dp.temporary_view() untuk menambahkan tampilan sementara ke alur Anda. Anda kemudian dapat mereferensikan tampilan ini menggunakan spark.read.table("temp_view_name") dalam beberapa definisi himpunan data hilir. Sintaks berikut menunjukkan pola ini:
from pyspark import pipelines as dp
@dp.temporary_view()
def a():
return spark.read.table("source").filter(...)
@dp.materialized_view()
def b():
return spark.read.table("a").groupBy(...)
@dp.materialized_view()
def c():
return spark.read.table("a").groupBy(...)
Ini memastikan bahwa pipeline memiliki pemahaman penuh terhadap transformasi dalam tampilan Anda selama perencanaan pipeline dan mencegah masalah potensial yang terkait dengan kode Python arbitrer yang berjalan di luar definisi kumpulan data.
Dalam fungsi Anda, Anda dapat merantai DataFrames bersama-sama untuk membuat DataFrames baru tanpa menulis hasil bertahap sebagai tampilan, tampilan materialisasi, atau tabel streaming, seperti dalam contoh berikut:
from pyspark import pipelines as dp
@dp.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)
Jika semua DataFrame Anda melakukan pembacaan awal menggunakan logika batch, hasil pengembalian Anda adalah DataFrame statis. Jika Anda memiliki kueri yang streaming, hasil pengembalian Anda adalah DataFrame streaming.
Mengembalikan DataFrame
Gunakan @dp.table untuk membuat tabel streaming dari hasil baca streaming. Gunakan @dp.materialized_view untuk membuat tampilan materialisasi dari hasil pembacaan batch. Sebagian besar dekorator lain bekerja pada DataFrame streaming dan statis, sementara beberapa membutuhkan DataFrame streaming.
Fungsi yang digunakan untuk menentukan himpunan data harus mengembalikan Spark DataFrame. Jangan pernah menggunakan metode yang menyimpan atau menulis ke file atau tabel sebagai bagian dari kode himpunan data alur Anda.
Contoh operasi Apache Spark yang tidak boleh digunakan dalam kode alur:
collect()count()toPandas()save()saveAsTable()start()toTable()
Nota
Pipeline juga mendukung penggunaan Pandas di Spark untuk fungsi definisi himpunan data. Lihat API Pandas di Spark.
Menggunakan SQL dalam alur Python
PySpark mendukung spark.sql operator untuk menulis kode DataFrame menggunakan SQL. Saat Anda menggunakan pola ini dalam kode sumber pipeline, pola ini dikompilasi menjadi tampilan materialisasi atau tabel streaming.
Contoh kode berikut setara dengan penggunaan spark.read.table("catalog_name.schema_name.table_name") untuk logika kueri himpunan data:
@dp.materialized_view
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")
dlt.read dan dlt.read_stream (warisan)
Modul lama dlt mencakup fungsi dlt.read() dan dlt.read_stream() yang diperkenalkan untuk mendukung fungsionalitas dalam mode penerbitan pipeline legacy. Metode ini didukung, tetapi Databricks merekomendasikan untuk selalu menggunakan spark.read.table() fungsi dan spark.readStream.table() karena hal berikut:
- Fungsi-fungsi
dltmemiliki dukungan terbatas dalam membaca himpunan data yang ditentukan di luar pipa saat ini. - Fungsi
sparkmendukung menentukan opsi, sepertiskipChangeCommits, untuk membaca operasi.dltfungsi tidak mendukung penentuan opsi. -
dltModul ini sendiri telah digantikan olehpyspark.pipelinesmodul. Databricks merekomendasikan penggunaanfrom pyspark import pipelines as dpuntuk diimporpyspark.pipelinesuntuk digunakan saat menulis kode alur di Python.