Bagikan melalui


Memuat dan memproses data dalam tahapan bertingkat dengan aliran Pipeline Deklaratif Lakeflow Spark

Data diproses dalam jalur melalui alur. Setiap alur terdiri dari kueri dan, biasanya, target. Alur memproses kueri, baik sebagai batch, atau secara bertahap sebagai aliran data ke dalam target. Aliran berada dalam pipeline di Alur Deklaratif Lakeflow Spark.

Biasanya, alur ditentukan secara otomatis saat Anda membuat kueri dalam alur yang memperbarui target, tetapi Anda juga dapat secara eksplisit menentukan alur tambahan untuk pemrosesan yang lebih kompleks, seperti menambahkan ke satu target dari beberapa sumber.

Pembaruan

Alur dijalankan setiap kali pipa pengaturannya diperbarui. Alur akan membuat atau memperbarui tabel dengan data terbaru yang tersedia. Bergantung pada jenis alur dan status perubahan pada data, pembaruan dapat melakukan refresh bertahap, yang hanya memproses rekaman baru, atau melakukan refresh penuh, yang memproses ulang semua rekaman dari sumber data.

Membuat alur default

Saat membuat alur, Anda biasanya menentukan tabel atau tampilan bersama dengan kueri yang mendukungnya. Misalnya, dalam kueri SQL ini, Anda membuat tabel streaming yang disebut customers_silver dengan membaca dari tabel yang disebut customers_bronze.

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

Anda juga dapat membuat tabel streaming yang sama di Python. Di Python, Anda menggunakan pipeline dengan membuat fungsi kueri yang mengembalikan dataframe, dengan dekorator untuk menambahkan fungsionalitas Lakeflow Spark Declarative Pipelines:

from pyspark import pipelines as dp

@dp.table()
def customers_silver():
  return spark.readStream.table("customers_bronze")

Dalam contoh ini, Anda membuat tabel streaming. Anda juga dapat membuat tampilan materialisasi dengan sintaks serupa di SQL dan Python. Untuk informasi selengkapnya, lihat Tabel streaming dan Tampilan materialisasi.

Contoh ini membuat alur default bersama dengan tabel streaming. Alur default untuk tabel streaming adalah alur penambahan , yang menambahkan baris baru dengan setiap pemicu. Ini adalah cara paling umum untuk menggunakan pipeline: membuat aliran dan sasaran dalam satu langkah. Anda dapat menggunakan gaya ini untuk menyerap data atau mengubah data.

Menambahkan alur juga mendukung pemrosesan yang memerlukan pembacaan data dari beberapa sumber streaming untuk memperbarui satu target. Misalnya, Anda dapat menggunakan fungsionalitas alur penambahan saat Anda memiliki tabel dan alur streaming yang ada dan ingin menambahkan sumber streaming baru yang menulis ke tabel streaming yang ada ini.

Menggunakan beberapa alur untuk menulis ke satu target

Dalam contoh sebelumnya, Anda membuat alur dan tabel streaming dalam satu langkah. Anda juga dapat membuat alur untuk tabel yang dibuat sebelumnya. Dalam contoh ini, Anda dapat melihat pembuatan tabel dan alur yang terkait dengannya dalam langkah terpisah. Kode ini memiliki hasil yang identik dengan membuat alur default, termasuk menggunakan nama yang sama untuk tabel streaming dan alur.

Phyton

from pyspark import pipelines as dp

# create streaming table
dp.create_streaming_table("customers_silver")

# add a flow
@dp.append_flow(
  target = "customers_silver")
def customer_silver():
  return spark.readStream.table("customers_bronze")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;

-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);

Membuat alur secara independen dari target berarti Anda juga dapat membuat beberapa alur yang menambahkan data ke target yang sama.

@dp.append_flow Gunakan dekorator di antarmuka Python atau CREATE FLOW...INSERT INTO klausa di antarmuka SQL untuk membuat alur baru, misalnya untuk menargetkan tabel streaming dari beberapa sumber streaming. Gunakan alur tambahan untuk memproses tugas seperti berikut ini:

  • Tambahkan sumber streaming yang menambahkan data ke tabel streaming yang ada tanpa memerlukan refresh penuh. Misalnya, Anda mungkin memiliki tabel yang menggabungkan data regional dari setiap wilayah tempat Anda beroperasi. Saat wilayah baru diluncurkan, Anda dapat menambahkan data wilayah baru ke tabel tanpa melakukan refresh penuh. Untuk contoh menambahkan sumber streaming ke tabel streaming yang ada, lihat Contoh: Menulis ke tabel streaming dari beberapa topik Kafka.
  • Perbarui tabel streaming dengan menambahkan data historis yang hilang (backfilling). Anda dapat menggunakan sintaks INSERT INTO ONCE untuk membuat pengisian ulang historis tambahan yang dijalankan satu kali. Misalnya, Anda memiliki tabel streaming yang sudah ada yang ditulis oleh topik Apache Kafka. Anda juga memiliki data historis yang disimpan dalam tabel yang perlu Anda sisipkan tepat sekali ke dalam tabel streaming, dan Anda tidak dapat mengalirkan data karena pemrosesan Anda mencakup melakukan agregasi kompleks sebelum menyisipkan data. Untuk contoh isi ulang, lihat Mengisi ulang data historis dengan alur.
  • Gabungkan data dari beberapa sumber dan tulis ke satu tabel streaming alih-alih menggunakan UNION klausa dalam kueri. Menggunakan pemrosesan alur tambahan alih-alih UNION memungkinkan Anda memperbarui tabel target secara bertahap tanpa menjalankan pembaruan refresh penuh. Untuk contoh penggabungan yang dilakukan dengan cara ini, lihat Contoh: Menggunakan pemrosesan alir susun alih-alih UNION.

Target untuk output rekaman oleh pemrosesan alur tambahan dapat berupa tabel yang sudah ada atau tabel baru. Untuk kueri Python, gunakan fungsi create_streaming_table() untuk membuat tabel target.

Contoh berikut menambahkan dua alur untuk target yang sama, membuat penyatuan dari dua tabel sumber:

Phyton

from pyspark import pipelines as dp

# create a streaming table
dp.create_streaming_table("customers_us")

# add the first append flow
@dp.append_flow(target = "customers_us")
def append1():
  return spark.readStream.table("customers_us_west")

# add the second append flow
@dp.append_flow(target = "customers_us")
def append2():
  return spark.readStream.table("customers_us_east")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;

-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);

-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);

Penting

  • Jika Anda perlu menentukan batasan kualitas data dengan harapan, tentukan harapan pada tabel target sebagai bagian create_streaming_table() dari fungsi atau pada definisi tabel yang ada. Anda tidak dapat menentukan ekspektasi dalam @append_flow definisi.
  • Alur diidentifikasi dengan nama alur, dan nama ini digunakan untuk mengidentifikasi titik pemeriksaan streaming. Penggunaan nama alur untuk mengidentifikasi titik pemeriksaan berarti sebagai berikut:
    • Jika sebuah alur yang ada dalam pipeline diganti namanya, titik pemeriksaan tidak dilanjutkan, dan alur yang diganti namanya tersebut secara efektif menjadi alur baru yang sepenuhnya berbeda.
    • Anda tidak dapat menggunakan kembali nama aliran dalam pipa, karena pos pemeriksaan yang ada tidak akan cocok dengan definisi aliran baru.

Jenis alur

Alur bawaan untuk tabel streaming dan tampilan materialisasi adalah alur penambahan. Anda juga dapat membuat alur untuk membaca dari sumber data pengambilan data perubahan. Tabel berikut ini menjelaskan berbagai jenis alur.

Jenis alur Description
Append Alur tambahan adalah jenis alur yang paling umum, di mana rekaman baru di sumber ditulis ke target dengan setiap pembaruan. Mereka sesuai dengan mode tambahan dalam streaming terstruktur. Anda dapat menambahkan penanda ONCE sebagai petunjuk untuk kueri batch yang datanya harus dimasukkan ke dalam target hanya sekali, kecuali jika target sepenuhnya diperbarui. Sejumlah alur append dapat menulis ke target tertentu.
Alur default (dibuat dengan tabel streaming target atau tampilan materialisasi) akan memiliki nama yang sama dengan target. Target lain tidak memiliki alur default.
CDC Otomatis (sebelumnya menerapkan perubahan) Aliran CDC Otomatis menyerap kueri yang berisi change data capture (CDC). Aliran CDC otomatis hanya dapat menargetkan tabel streaming, dan sumbernya harus berupa sumber streaming (bahkan dalam kasus aliran ONCE). Beberapa alur CDC otomatis dapat menargetkan satu tabel streaming. Tabel streaming yang bertindak sebagai target untuk alur CDC otomatis hanya dapat ditargetkan oleh alur CDC otomatis lainnya.
Untuk informasi selengkapnya tentang data CDC, lihat API CDC OTOMATIS: Menyederhanakan perubahan pengambilan data dengan alur.

Informasi Tambahan

Untuk informasi selengkapnya tentang alur dan penggunaannya, lihat topik berikut: