Bagikan melalui


Referensi bahasa Python Delta Live Tables

Artikel ini memiliki detail untuk antarmuka pemrograman Python Tabel Langsung Delta.

Untuk informasi tentang API SQL, lihat referensi bahasa SQL Delta Live Tables.

Untuk detail khusus untuk mengonfigurasi Auto Loader, lihat Apa itu Auto Loader?.

Sebelum Anda memulai

Berikut ini adalah pertimbangan penting saat Anda menerapkan alur dengan antarmuka Python Delta Live Tables:

  • Karena Python table() dan view() fungsi dipanggil beberapa kali selama perencanaan dan menjalankan pembaruan alur, jangan sertakan kode dalam salah satu fungsi ini yang mungkin memiliki efek samping (misalnya, kode yang memodifikasi data atau mengirim email). Untuk menghindari perilaku tak terduga, fungsi Python Anda yang menentukan himpunan data hanya boleh menyertakan kode yang diperlukan untuk menentukan tabel atau tampilan.
  • Untuk melakukan operasi seperti mengirim email atau mengintegrasikan dengan layanan pemantauan eksternal, terutama dalam fungsi yang menentukan himpunan data, gunakan kait peristiwa. Menerapkan operasi ini dalam fungsi yang menentukan himpunan data Anda akan menyebabkan perilaku yang tidak terduga.
  • Python table dan view fungsi harus mengembalikan DataFrame. Beberapa fungsi yang beroperasi pada DataFrames tidak mengembalikan DataFrames dan tidak boleh digunakan. Operasi ini mencakup fungsi seperti collect(), , count(), toPandas()save(), dan saveAsTable(). Karena transformasi DataFrame dijalankan setelah grafik aliran data lengkap diselesaikan, menggunakan operasi tersebut mungkin memiliki efek samping yang tidak diinginkan. Namun, Anda dapat menyertakan fungsi-fungsi ini di luar table definisi fungsi atau view karena kode ini dijalankan sekali selama fase inisialisasi grafik.

dlt Mengimpor modul Python

Delta Live Tables Fungsi Python didefinisikan dalam modul dlt. Alur Anda yang diimplementasikan dengan API Python harus mengimpor modul ini:

import dlt

Membuat tampilan terwujud atau tabel streaming Tabel Langsung Delta

Di Python, Tabel Langsung Delta menentukan apakah akan memperbarui himpunan data sebagai tampilan materialisasi atau tabel streaming berdasarkan kueri yang menentukan. Dekorator @table dapat digunakan untuk menentukan tampilan materialisasi dan tabel streaming.

Untuk menentukan tampilan materialisasi di Python, terapkan @table ke kueri yang melakukan baca statis terhadap sumber data. Untuk menentukan tabel streaming, terapkan @table ke kueri yang melakukan pembacaan streaming terhadap sumber data atau gunakan fungsi create_streaming_table(). Kedua jenis himpunan data memiliki spesifikasi sintaks yang sama seperti berikut:

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Membuat tampilan Tabel Langsung Delta

Untuk menentukan tampilan dengan Python, terapkan dekorator @view. Seperti dekorator @table , Anda dapat menggunakan tampilan di Tabel Langsung Delta untuk himpunan data statis atau streaming. Berikut ini adalah sintaks untuk menentukan tampilan dengan Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Contoh: Menentukan tabel dan tampilan

Untuk menentukan tabel atau tampilan di Python, terapkan @dlt.view atau @dlt.table dekorator ke fungsi. Anda dapat menggunakan nama fungsi atau parameter name untuk menetapkan nama tabel atau tampilan. Contoh berikut mendefinisikan dua himpunan data yang berbeda: tampilan bernama taxi_raw yang menggunakan file JSON sebagai sumber input dan tabel bernama filtered_data yang menggunakan tampilan taxi_raw sebagai input:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

Contoh: Mengakses himpunan data yang ditentukan dalam alur yang sama

Selain membaca dari sumber data eksternal, Anda dapat mengakses himpunan data yang ditentukan dalam alur yang sama dengan fungsi Tabel read() Langsung Delta. Contoh berikut menunjukkan pembuatan himpunan customers_filtered read() data menggunakan fungsi :

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

Anda juga dapat menggunakan spark.table() fungsi untuk mengakses himpunan data yang ditentukan dalam alur yang sama. Saat menggunakan fungsi spark.table() untuk mengakses himpunan data yang ditentukan dalam alur, dalam argumen fungsi tambahkan kata kunci LIVE ke nama himpunan data:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

Contoh: Membaca dari tabel yang terdaftar di metastore

Untuk membaca data dari tabel yang terdaftar di metastore Apache Hive, dalam argumen fungsi, hilangkan LIVE kata kunci dan secara opsional memenuhi syarat nama tabel dengan nama database:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Untuk contoh membaca dari tabel Katalog Unity, lihat Menyerap data ke dalam alur Katalog Unity.

Contoh: Mengakses himpunan data menggunakan spark.sql

Anda juga dapat mengembalikan himpunan data menggunakan ekspresi spark.sql dalam fungsi kueri. Untuk membaca dari himpunan data internal, tambahkan LIVE. ke nama himpunan data:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Membuat tabel untuk digunakan sebagai target operasi streaming

create_streaming_table() Gunakan fungsi untuk membuat tabel target untuk output rekaman berdasarkan operasi streaming, termasuk apply_changes(), apply_changes_from_snapshot(), dan rekaman output @append_flow.

Catatan

Fungsi create_target_table() dan create_streaming_live_table() tidak digunakan lagi. Databricks merekomendasikan pembaruan kode yang telah ada untuk menggunakan fungsi create_streaming_table().

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
Argumen
name

Jenis: str

Nama tabel.

Parameter ini diperlukan.
comment

Jenis: str

Deskripsi opsional untuk tabel.
spark_conf

Jenis: dict

Daftar opsional konfigurasi Spark untuk eksekusi kueri ini.
table_properties

Jenis: dict

Daftar opsional properti tabel untuk tabel.
partition_cols

Jenis: array

Daftar opsional dari satu atau beberapa kolom yang digunakan untuk mempartisi tabel.
path

Jenis: str

Lokasi penyimpanan opsional untuk data tabel. Jika tidak diatur, sistem default ke lokasi penyimpanan alur.
schema

Jenis: str atau StructType

Definisi skema opsional untuk tabel. Skema dapat didefinisikan sebagai string SQL DDL atau dengan Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Jenis: dict

Batasan kualitas data opsional untuk tabel. Lihat beberapa ekspektasi.

Mengontrol bagaimana tabel diwujudkan

Tabel juga menawarkan kontrol tambahan atas materialisasinya:

  • Tentukan bagaimana tabel dipartisi menggunakan partition_cols. Anda dapat menggunakan partisi untuk mempercepat kueri.
  • Anda dapat mengatur properti tabel saat Anda menentukan tampilan atau tabel. Lihat Properti tabel Tabel Langsung Delta.
  • Atur lokasi penyimpanan untuk data tabel menggunakan pengaturan path. Secara default, data tabel disimpan di lokasi penyimpanan alur jika path tidak diatur.
  • Anda dapat menggunakan kolom yang dihasilkan dalam definisi skema Anda. Lihat Contoh: Menentukan skema dan kolom partisi.

Catatan

Untuk tabel berukuran kurang dari 1 TB, Databricks merekomendasikan untuk mengizinkan Delta Live Table mengontrol organisasi data. Anda tidak boleh menentukan kolom partisi kecuali jika Anda mengharapkan tabel Anda tumbuh di luar terabyte.

Contoh: Tentukan skema dan kolom partisi

Anda dapat secara opsional menentukan skema tabel menggunakan Python StructType atau untai (karakter) SQL DDL. Ketika ditentukan dengan string DDL, definisi dapat menyertakan kolom yang dihasilkan.

Contoh berikut membuat tabel yang disebut sales dengan skema yang ditentukan menggunakan Python StructType:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

Contoh berikut menentukan skema untuk tabel menggunakan string DDL, menentukan kolom yang dihasilkan, dan menentukan kolom partisi:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Secara default, Delta Live Tables menyimpulkan skema dari definisi table jika Anda tidak menentukan skema.

Mengonfigurasi tabel streaming untuk mengabaikan perubahan dalam tabel streaming sumber

Catatan

  • skipChangeCommits Bendera hanya berfungsi dengan spark.readStream menggunakan option() fungsi . Anda tidak dapat menggunakan bendera ini dalam fungsi dlt.read_stream() .
  • Anda tidak dapat menggunakan skipChangeCommits bendera saat tabel streaming sumber didefinisikan sebagai target fungsi apply_changes().

Secara default, tabel streaming memerlukan sumber khusus tambahan. Saat tabel streaming menggunakan tabel streaming lain sebagai sumber, dan tabel streaming sumber memerlukan pembaruan atau penghapusan, misalnya, pemrosesan GDPR "hak untuk dilupakan", skipChangeCommits bendera dapat diatur saat membaca tabel streaming sumber untuk mengabaikan perubahan tersebut. Untuk informasi selengkapnya tentang bendera ini, lihat Mengabaikan pembaruan dan penghapusan.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Properti Tabel Langsung Python Delta

Tabel berikut ini menjelaskan opsi dan properti yang bisa Anda tentukan saat menentukan tabel dan tampilan dengan Tabel Langsung Delta:

@table atau @view
name

Jenis: str

Nama opsional untuk tabel atau tampilan. Jika tidak ditentukan, nama fungsi digunakan sebagai nama tabel atau tampilan.
comment

Jenis: str

Deskripsi opsional untuk tabel.
spark_conf

Jenis: dict

Daftar opsional konfigurasi Spark untuk eksekusi kueri ini.
table_properties

Jenis: dict

Daftar opsional properti tabel untuk tabel.
path

Jenis: str

Lokasi penyimpanan opsional untuk data tabel. Jika tidak diatur, sistem default ke lokasi penyimpanan alur.
partition_cols

Jenis: a collection of str

Koleksi opsional, misalnya, dari satu atau beberapa list kolom yang akan digunakan untuk mempartisi tabel.
schema

Jenis: str atau StructType

Definisi skema opsional untuk tabel. Skema dapat didefinisikan sebagai string SQL DDL atau dengan Python
StructType.
temporary

Jenis: bool

Buat tabel, tetapi jangan terbitkan metadata untuk tabel. Kata temporary kunci menginstruksikan Tabel Langsung Delta untuk membuat tabel yang tersedia untuk alur tetapi tidak boleh diakses di luar alur. Untuk mengurangi waktu pemrosesan, tabel sementara bertahan selama masa pakai alur yang membuatnya, dan bukan hanya satu pembaruan.

Defaultnya adalah false.
Definisi tabel atau tampilan
def <function-name>()

Fungsi Python yang mendefinisikan himpunan data. Jika parameter name tidak diatur, maka <function-name> digunakan sebagai nama himpunan data target.
query

Pernyataan SQL Spark yang mengembalikan Spark Dataset atau Koalas DataFrame.

Gunakan dlt.read() atau spark.table() untuk melakukan pembacaan lengkap dari himpunan data yang ditentukan dalam alur yang sama. Saat menggunakan fungsi spark.table() untuk membaca dari himpunan data yang ditentukan dalam alur yang sama, tambahkan kata kunci LIVE ke nama himpunan data dalam argumen fungsi. Misalnya, untuk membaca dari himpunan data bernama customers:

spark.table("LIVE.customers")

Anda juga dapat menggunakan fungsi spark.table() untuk membaca dari tabel yang terdaftar di metastore dengan menghilangkan kata kunci LIVE dan secara opsional mengkualifikasikan nama tabel dengan nama database:

spark.table("sales.customers")

Gunakan dlt.read_stream() untuk melakukan pembacaan streaming dari himpunan data yang ditentukan dalam alur yang sama.

Gunakan fungsi spark.sql untuk menentukan kueri SQL guna membuat himpunan data kembali.

Gunakan sintaks PySpark untuk menentukan kueri Delta Live Tables dengan Python.
Ekspektasi
@expect("description", "constraint")

Deklarasikan batasan kualitas data yang diidentifikasi oleh
description. Jika baris melanggar harapan, sertakan baris dalam himpunan data target.
@expect_or_drop("description", "constraint")

Deklarasikan batasan kualitas data yang diidentifikasi oleh
description. Jika baris melanggar harapan, jatuhkan baris dari himpunan data target.
@expect_or_fail("description", "constraint")

Deklarasikan batasan kualitas data yang diidentifikasi oleh
description. Jika berturut-turut melanggar harapan, segera hentikan eksekusi.
@expect_all(expectations)

Deklarasikan satu atau lebih batasan kualitas data.
expectations adalah kamus Python, di mana kuncinya adalah deskripsi ekspektasi dan nilainya adalah batasan ekspektasi. Jika baris melanggar salah satu harapan, sertakan baris dalam himpunan data target.
@expect_all_or_drop(expectations)

Deklarasikan satu atau lebih batasan kualitas data.
expectations adalah kamus Python, di mana kuncinya adalah deskripsi ekspektasi dan nilainya adalah batasan ekspektasi. Jika baris melanggar salah satu harapan, hapus baris dari himpunan data target.
@expect_all_or_fail(expectations)

Deklarasikan satu atau lebih batasan kualitas data.
expectations adalah kamus Python, di mana kuncinya adalah deskripsi ekspektasi dan nilainya adalah batasan ekspektasi. Jika baris melanggar salah satu harapan, segera hentikan eksekusi.

Mengubah pengambilan data dari umpan perubahan dengan Python di Tabel Langsung Delta

apply_changes() Gunakan fungsi di Python API untuk menggunakan fungsionalitas pengambilan data perubahan (CDC) Tabel Langsung Delta untuk memproses data sumber dari umpan data perubahan (CDF).

Penting

Anda harus mendeklarasikan tabel streaming target untuk menerapkan perubahan. Anda dapat secara opsional menentukan skema untuk tabel target Anda. Saat menentukan skema apply_changes() tabel target, Anda harus menyertakan __START_AT kolom dan __END_AT dengan tipe sequence_by data yang sama dengan bidang.

Untuk membuat tabel target yang diperlukan, Anda dapat menggunakan fungsi create_streaming_table() di antarmuka Python Tabel Langsung Delta.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Catatan

Untuk APPLY CHANGES pemrosesan, perilaku default untuk INSERT peristiwa dan UPDATE adalah meningkatkan peristiwa CDC dari sumbernya: memperbarui baris apa pun dalam tabel target yang cocok dengan kunci yang ditentukan atau menyisipkan baris baru saat rekaman yang cocok tidak ada di tabel target. Penanganan untuk DELETE peristiwa dapat ditentukan dengan kondisi tersebut APPLY AS DELETE WHEN.

Untuk mempelajari selengkapnya tentang pemrosesan CDC dengan umpan perubahan, lihat TERAPKAN PERUBAHAN API: Menyederhanakan perubahan pengambilan data dengan Tabel Langsung Delta. Untuk contoh penggunaan apply_changes() fungsi, lihat Contoh: pemrosesan SCD tipe 1 dan SCD tipe 2 dengan data sumber CDF.

Penting

Anda harus mendeklarasikan tabel streaming target untuk menerapkan perubahan. Anda dapat secara opsional menentukan skema untuk tabel target Anda. Saat menentukan apply_changes skema tabel target, Anda harus menyertakan __START_AT kolom dan __END_AT dengan jenis sequence_by data yang sama dengan bidang .

Lihat TERAPKAN PERUBAHAN API: Menyederhanakan perubahan pengambilan data dengan Delta Live Tables.

Argumen
target

Jenis: str

Nama tabel yang akan diperbarui. Anda dapat menggunakan fungsi create_streaming_table() untuk membuat tabel target sebelum menjalankan apply_changes() fungsi.

Parameter ini diperlukan.
source

Jenis: str

Sumber data yang berisi catatan CDC.

Parameter ini diperlukan.
keys

Jenis: list

Kolom atau kombinasi kolom yang secara unik mengidentifikasi baris dalam data sumber. Ini digunakan untuk mengidentifikasi peristiwa CDC mana yang berlaku untuk catatan tertentu dalam tabel target.

Anda dapat menentukan:

* Daftar untai (karakter): ["userId", "orderId"]
* Daftar fungsi Spark SQLcol() fungsi: [col("userId"), col("orderId"]

Argumen untuk col() fungsi tidak dapat mencakup kualifikasi. Misalnya, Anda dapat menggunakan col(userId), tetapi Anda tidak dapat menggunakan col(source.userId).

Parameter ini diperlukan.
sequence_by

Jenis: str atau col()

Nama kolom yang menentukan urutan logis peristiwa CDC dalam data sumber. Delta Live Tables menggunakan pengurutan ini untuk menangani peristiwa perubahan yang tidak sesuai pesanan.

Anda dapat menentukan:

* Untai (karakter): "sequenceNum"
* Spark SQL col() fungsi: col("sequenceNum")

Argumen untuk col() fungsi tidak dapat mencakup kualifikasi. Misalnya, Anda dapat menggunakan col(userId), tetapi Anda tidak dapat menggunakan col(source.userId).

Parameter ini diperlukan.
ignore_null_updates

Jenis: bool

Izinkan menyerap pembaruan yang berisi subset kolom target. Saat peristiwa CDC cocok dengan baris yang ada dan ignore_null_updates adalah True, kolom dengan null mempertahankan nilai yang ada dalam target. Ini juga berlaku untuk kolom bersarang dengan nilai null. Ketika ignore_null_updates adalah False, nilai yang ada ditimpa dengan null nilai.

Parameter ini bersifat opsional.

Default adalah False.
apply_as_deletes

Jenis: str atau expr()

Menentukan kapan peristiwa CDC harus diperlakukan sebagai DELETE bukan upsert. Untuk menangani data yang tidak sesuai urutan, baris yang dihapus untuk sementara disimpan sebagai penanda di tabel Delta yang mendasarinya, dan tampilan dibuat di metastore yang menyaring penanda ini. Interval retensi dapat dikonfigurasi dengan
pipelines.cdc.tombstoneGCThresholdInSecondsproperti tabel.

Anda dapat menentukan:

* Untai (karakter): "Operation = 'DELETE'"
* Spark SQL expr() fungsi: expr("Operation = 'DELETE'")

Parameter ini bersifat opsional.
apply_as_truncates

Jenis: str atau expr()

Menentukan kapan acara CDC harus diperlakukan sebagai tabel lengkap TRUNCATE. Karena klausa ini memicu pemotongan penuh tabel target, klausa ini harus digunakan hanya untuk kasus penggunaan tertentu yang memerlukan fungsionalitas ini.

Parameter apply_as_truncates hanya didukung untuk SCD jenis 1. SCD tipe 2 tidak mendukung operasi pemotongan.

Anda dapat menentukan:

* Untai (karakter): "Operation = 'TRUNCATE'"
* Spark SQL expr() fungsi: expr("Operation = 'TRUNCATE'")

Parameter ini bersifat opsional.
column_list

except_column_list

Jenis: list

Subset kolom untuk disertakan dalam tabel target. Gunakan column_list untuk menentukan daftar lengkap kolom untuk disertakan. Gunakan except_column_list untuk menentukan kolom yang akan dikecualikan. Anda dapat mendeklarasikan nilai sebagai daftar string atau sebagai fungsi Spark SQL col():

* column_list = ["userId", "name", "city"].
* column_list = [col("userId"), col("name"), col("city")]
* except_column_list = ["operation", "sequenceNum"]
* except_column_list = [col("operation"), col("sequenceNum")

Argumen untuk col() fungsi tidak dapat mencakup kualifikasi. Misalnya, Anda dapat menggunakan col(userId), tetapi Anda tidak dapat menggunakan col(source.userId).

Parameter ini bersifat opsional.

Defaultnya adalah memasukkan semua kolom dalam tabel target saat tidak ada column_list atau except_column_list argumen diteruskan ke fungsi.
stored_as_scd_type

Jenis: str atau int

Apakah Anda akan menyimpan rekaman sebagai SCD jenis 1 atau SCD jenis 2.

Atur ke 1 untuk SCD jenis 1 atau 2 untuk SCD jenis 2.

Klausa ini opsional.

Defaultnya adalah SCD jenis 1.
track_history_column_list

track_history_except_column_list

Jenis: list

Subset kolom output yang akan dilacak untuk riwayat dalam tabel target. Gunakan track_history_column_list untuk menentukan daftar lengkap kolom yang akan dilacak. Menggunakan
track_history_except_column_list untuk menentukan kolom yang akan dikecualikan dari pelacakan. Anda dapat mendeklarasikan nilai sebagai daftar string atau sebagai fungsi Spark SQL col() : - track_history_column_list = ["userId", "name", "city"]. - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumen untuk col() fungsi tidak dapat mencakup kualifikasi. Misalnya, Anda dapat menggunakan col(userId), tetapi Anda tidak dapat menggunakan col(source.userId).

Parameter ini bersifat opsional.

Defaultnya adalah menyertakan semua kolom dalam tabel target saat tidak ada track_history_column_list atau
track_history_except_column_list argumen diteruskan ke fungsi.

Mengubah pengambilan data dari rekam jepret database dengan Python di Tabel Langsung Delta

Penting

APPLY CHANGES FROM SNAPSHOT API berada di Pratinjau Umum.

apply_changes_from_snapshot() Gunakan fungsi di Python API untuk menggunakan fungsionalitas Tangkapan data perubahan (CDC) Tabel Langsung Delta untuk memproses data sumber dari rekam jepret database.

Penting

Anda harus mendeklarasikan tabel streaming target untuk menerapkan perubahan. Anda dapat secara opsional menentukan skema untuk tabel target Anda. Saat menentukan skema tabel target apply_changes_from_snapshot(), Anda juga harus menyertakan kolom __START_AT dan __END_AT dengan jenis data yang sama dengan bidang sequence_by.

Untuk membuat tabel target yang diperlukan, Anda dapat menggunakan fungsi create_streaming_table() di antarmuka Python Tabel Langsung Delta.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

Catatan

Untuk APPLY CHANGES FROM SNAPSHOT pemrosesan, perilaku defaultnya adalah menyisipkan baris baru saat rekaman yang cocok dengan kunci yang sama tidak ada di target. Jika rekaman yang cocok memang ada, rekaman tersebut hanya diperbarui jika salah satu nilai dalam baris telah berubah. Baris dengan kunci yang ada di target tetapi tidak lagi ada di sumber dihapus.

Untuk mempelajari selengkapnya tentang pemrosesan CDC dengan rekam jepret, lihat TERAPKAN PERUBAHAN API: Menyederhanakan perubahan pengambilan data dengan Tabel Langsung Delta. Untuk contoh penggunaan apply_changes_from_snapshot() fungsi, lihat contoh penyerapan rekam jepret berkala dan penyerapan rekam jepret historis.

Argumen
target

Jenis: str

Nama tabel yang akan diperbarui. Anda dapat menggunakan fungsi create_streaming_table() untuk membuat tabel target sebelum menjalankan apply_changes() fungsi.

Parameter ini diperlukan.
source

Jenis: str atau lambda function

Baik nama tabel atau tampilan untuk rekam jepret secara berkala atau fungsi lambda Python yang mengembalikan DataFrame rekam jepret yang akan diproses dan versi rekam jepret. Lihat Menerapkan argumen sumber.

Parameter ini diperlukan.
keys

Jenis: list

Kolom atau kombinasi kolom yang secara unik mengidentifikasi baris dalam data sumber. Ini digunakan untuk mengidentifikasi peristiwa CDC mana yang berlaku untuk catatan tertentu dalam tabel target.

Anda dapat menentukan:

* Daftar untai (karakter): ["userId", "orderId"]
* Daftar fungsi Spark SQLcol() fungsi: [col("userId"), col("orderId"]

Argumen untuk col() fungsi tidak dapat mencakup kualifikasi. Misalnya, Anda dapat menggunakan col(userId), tetapi Anda tidak dapat menggunakan col(source.userId).

Parameter ini diperlukan.
stored_as_scd_type

Jenis: str atau int

Apakah Anda akan menyimpan rekaman sebagai SCD jenis 1 atau SCD jenis 2.

Atur ke 1 untuk SCD jenis 1 atau 2 untuk SCD jenis 2.

Klausa ini opsional.

Defaultnya adalah SCD jenis 1.
track_history_column_list

track_history_except_column_list

Jenis: list

Subset kolom output yang akan dilacak untuk riwayat dalam tabel target. Gunakan track_history_column_list untuk menentukan daftar lengkap kolom yang akan dilacak. Menggunakan
track_history_except_column_list untuk menentukan kolom yang akan dikecualikan dari pelacakan. Anda dapat mendeklarasikan nilai sebagai daftar string atau sebagai fungsi Spark SQL col() : - track_history_column_list = ["userId", "name", "city"]. - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumen untuk col() fungsi tidak dapat mencakup kualifikasi. Misalnya, Anda dapat menggunakan col(userId), tetapi Anda tidak dapat menggunakan col(source.userId).

Parameter ini bersifat opsional.

Defaultnya adalah menyertakan semua kolom dalam tabel target saat tidak ada track_history_column_list atau
track_history_except_column_list argumen diteruskan ke fungsi.

source Menerapkan argumen

Fungsi ini apply_changes_from_snapshot() mencakup source argumen . Untuk memproses rekam jepret historis, source argumen diharapkan menjadi fungsi lambda Python yang mengembalikan dua nilai ke apply_changes_from_snapshot() fungsi: Python DataFrame yang berisi data rekam jepret yang akan diproses dan versi rekam jepret.

Berikut ini adalah tanda tangan fungsi lambda:

lambda Any => Optional[(DataFrame, Any)]
  • Argumen untuk fungsi lambda adalah versi rekam jepret yang terakhir diproses.
  • Nilai pengembalian fungsi lambda adalah None atau tuple dari dua nilai: Nilai pertama tuple adalah DataFrame yang berisi rekam jepret yang akan diproses. Nilai kedua dari tuple adalah versi rekam jepret yang mewakili urutan logis rekam jepret.

Contoh yang mengimplementasikan dan memanggil fungsi lambda:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Runtime Tabel Langsung Delta melakukan langkah-langkah berikut setiap kali alur yang berisi fungsi dipicu apply_changes_from_snapshot() :

  1. next_snapshot_and_version Menjalankan fungsi untuk memuat DataFrame rekam jepret berikutnya dan versi rekam jepret yang sesuai.
  2. Jika tidak ada DataFrame yang dikembalikan, eksekusi dihentikan dan pembaruan alur ditandai sebagai selesai.
  3. Mendeteksi perubahan dalam rekam jepret baru dan menerapkannya secara bertahap ke tabel target.
  4. Kembali ke langkah #1 untuk memuat rekam jepret berikutnya dan versinya.

Batasan

Antarmuka Python Tabel Langsung Delta memiliki batasan berikut:

Fungsi pivot() tidak didukung. pivot Operasi di Spark memerlukan pemuatan data input yang bersemangat untuk menghitung skema output. Kemampuan ini tidak didukung dalam Tabel Langsung Delta.