Referensi bahasa Python Delta Live Tables
Artikel ini menyediakan detail untuk antarmuka pemrograman Python Tabel Langsung Delta.
Untuk informasi tentang API SQL, lihat referensi bahasa SQL Delta Live Tables.
Untuk detail khusus untuk mengonfigurasi Auto Loader, lihat Apa itu Auto Loader?.
Batasan
Antarmuka Python Tabel Langsung Delta memiliki batasan berikut:
- Python
table
danview
fungsi harus mengembalikan DataFrame. Beberapa fungsi yang beroperasi pada DataFrames tidak mengembalikan DataFrames dan tidak boleh digunakan. Karena transformasi DataFrame dijalankan setelah grafik aliran data lengkap diselesaikan, menggunakan operasi tersebut mungkin memiliki efek samping yang tidak diinginkan. Operasi ini mencakup fungsi seperticollect()
, ,count()
,toPandas()
save()
, dansaveAsTable()
. Namun, Anda dapat menyertakan fungsi-fungsi ini di luartable
definisi fungsi atauview
karena kode ini dijalankan sekali selama fase inisialisasi grafik. - Fungsi
pivot()
tidak didukung.pivot
Operasi di Spark memerlukan pemuatan data input yang bersemangat untuk menghitung skema output. Kemampuan ini tidak didukung dalam Tabel Langsung Delta.
dlt
Mengimpor modul Python
Delta Live Tables Fungsi Python didefinisikan dalam modul dlt
. Alur Anda yang diimplementasikan dengan API Python harus mengimpor modul ini:
import dlt
Membuat tampilan terwujud atau tabel streaming Tabel Langsung Delta
Di Python, Tabel Langsung Delta menentukan apakah akan memperbarui himpunan data sebagai tampilan materialisasi atau tabel streaming berdasarkan kueri yang menentukan. Dekorator @table
digunakan untuk menentukan tampilan materialisasi dan tabel streaming.
Untuk menentukan tampilan materialisasi di Python, terapkan @table
ke kueri yang melakukan baca statis terhadap sumber data. Untuk menentukan tabel streaming, terapkan @table
ke kueri yang melakukan pembacaan streaming terhadap sumber data. Kedua jenis himpunan data memiliki spesifikasi sintaks yang sama seperti berikut:
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
schema="schema-definition",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Membuat tampilan Tabel Langsung Delta
Untuk menentukan tampilan dengan Python, terapkan dekorator @view
. Seperti dekorator @table
, Anda dapat menggunakan tampilan di Tabel Langsung Delta untuk himpunan data statis atau streaming. Berikut ini adalah sintaks untuk menentukan tampilan dengan Python:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Contoh: Menentukan tabel dan tampilan
Untuk menentukan tabel atau tampilan di Python, terapkan @dlt.view
atau @dlt.table
dekorator ke fungsi. Anda dapat menggunakan nama fungsi atau parameter name
untuk menetapkan nama tabel atau tampilan. Contoh berikut mendefinisikan dua himpunan data yang berbeda: tampilan bernama taxi_raw
yang menggunakan file JSON sebagai sumber input dan tabel bernama filtered_data
yang menggunakan tampilan taxi_raw
sebagai input:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return dlt.read("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return dlt.read("taxi_raw").where(...)
Contoh: Mengakses himpunan data yang ditentukan dalam alur yang sama
Selain membaca dari sumber data eksternal, Anda dapat mengakses himpunan data yang ditentukan dalam alur yang sama dengan fungsi Tabel read()
Langsung Delta. Contoh berikut menunjukkan pembuatan himpunan customers_filtered
read()
data menggunakan fungsi :
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return dlt.read("customers_raw").where(...)
Anda juga dapat menggunakan spark.table()
fungsi untuk mengakses himpunan data yang ditentukan dalam alur yang sama. Saat menggunakan fungsi spark.table()
untuk mengakses himpunan data yang ditentukan dalam alur, dalam argumen fungsi tambahkan kata kunci LIVE
ke nama himpunan data:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredB():
return spark.table("LIVE.customers_raw").where(...)
Contoh: Membaca dari tabel yang terdaftar di metastore
Untuk membaca data dari tabel yang terdaftar di metastore Apache Hive, dalam argumen fungsi hilangkan LIVE
kata kunci dan secara opsional memenuhi syarat nama tabel dengan nama database:
@dlt.table
def customers():
return spark.table("sales.customers").where(...)
Untuk contoh membaca dari tabel Katalog Unity, lihat Menyerap data ke dalam alur Katalog Unity.
Contoh: Mengakses himpunan data menggunakan spark.sql
Anda juga dapat mengembalikan himpunan data menggunakan ekspresi spark.sql
dalam fungsi kueri. Untuk membaca dari himpunan data internal, tambahkan LIVE.
ke nama himpunan data:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
Membuat tabel untuk digunakan sebagai target operasi streaming
create_streaming_table()
Gunakan fungsi untuk membuat tabel target untuk output rekaman berdasarkan operasi streaming, termasuk apply_changes() dan rekaman output @append_flow.
Catatan
Fungsi create_target_table()
dan create_streaming_live_table()
tidak digunakan lagi. Databricks merekomendasikan pembaruan kode yang telah ada untuk menggunakan fungsi create_streaming_table()
.
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
Argumen |
---|
name Jenis: str Nama tabel. Parameter ini diperlukan. |
comment Jenis: str Deskripsi opsional untuk tabel. |
spark_conf Jenis: dict Daftar opsional konfigurasi Spark untuk eksekusi kueri ini. |
table_properties Jenis: dict Daftar opsional properti tabel untuk tabel. |
partition_cols Jenis: array Daftar opsional dari satu atau beberapa kolom yang digunakan untuk mempartisi tabel. |
path Jenis: str Lokasi penyimpanan opsional untuk data tabel. Jika tidak diatur, sistem akan default ke lokasi penyimpanan alur. |
schema Jenis: str atau StructType Definisi skema opsional untuk tabel. Skema dapat didefinisikan sebagai string SQL DDL, atau dengan Python StructType . |
expect_all expect_all_or_drop expect_all_or_fail Jenis: dict Batasan kualitas data opsional untuk tabel. Lihat beberapa ekspektasi. |
Mengontrol bagaimana tabel diwujudkan
Tabel juga menawarkan kontrol tambahan atas materialisasinya:
- Tentukan bagaimana tabel dipartisi menggunakan
partition_cols
. Anda dapat menggunakan partisi untuk mempercepat kueri. - Anda dapat mengatur properti tabel saat Anda menentukan tampilan atau tabel. Lihat Properti tabel Tabel Langsung Delta.
- Atur lokasi penyimpanan untuk data tabel menggunakan pengaturan
path
. Secara default, data tabel disimpan di lokasi penyimpanan alur jikapath
tidak diatur. - Anda dapat menggunakan kolom yang dihasilkan dalam definisi skema Anda. Lihat Contoh: Menentukan skema dan kolom partisi.
Catatan
Untuk tabel berukuran kurang dari 1 TB, Databricks merekomendasikan untuk mengizinkan Delta Live Table mengontrol organisasi data. Kecuali Anda mengharapkan tabel Anda tumbuh di luar terabyte, Anda umumnya tidak boleh menentukan kolom partisi.
Contoh: Tentukan skema dan kolom partisi
Anda dapat secara opsional menentukan skema tabel menggunakan Python StructType
atau untai (karakter) SQL DDL. Ketika ditentukan dengan string DDL, definisi dapat menyertakan kolom yang dihasilkan.
Contoh berikut membuat tabel yang disebut sales
dengan skema yang ditentukan menggunakan Python StructType
:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
Contoh berikut menentukan skema untuk tabel menggunakan string DDL, menentukan kolom yang dihasilkan, dan menentukan kolom partisi:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
Secara default, Delta Live Tables menyimpulkan skema dari definisi table
jika Anda tidak menentukan skema.
Mengonfigurasi tabel streaming untuk mengabaikan perubahan dalam tabel streaming sumber
Catatan
skipChangeCommits
Bendera hanya berfungsi denganspark.readStream
menggunakanoption()
fungsi . Anda tidak dapat menggunakan bendera ini dalam fungsidlt.read_stream()
.- Anda tidak dapat menggunakan
skipChangeCommits
bendera saat tabel streaming sumber didefinisikan sebagai target fungsi apply_changes().
Secara default, tabel streaming memerlukan sumber khusus tambahan. Saat tabel streaming menggunakan tabel streaming lain sebagai sumber, dan tabel streaming sumber memerlukan pembaruan atau penghapusan, misalnya, pemrosesan GDPR "hak untuk dilupakan", skipChangeCommits
bendera dapat diatur saat membaca tabel streaming sumber untuk mengabaikan perubahan tersebut. Untuk informasi selengkapnya tentang bendera ini, lihat Mengabaikan pembaruan dan penghapusan.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
Properti Tabel Langsung Python Delta
Tabel berikut ini menjelaskan opsi dan properti yang bisa Anda tentukan saat menentukan tabel dan tampilan dengan Tabel Langsung Delta:
@table atau @view |
---|
name Jenis: str Nama opsional untuk tabel atau tampilan. Jika tidak ditentukan, nama fungsi digunakan sebagai nama tabel atau tampilan. |
comment Jenis: str Deskripsi opsional untuk tabel. |
spark_conf Jenis: dict Daftar opsional konfigurasi Spark untuk eksekusi kueri ini. |
table_properties Jenis: dict Daftar opsional properti tabel untuk tabel. |
path Jenis: str Lokasi penyimpanan opsional untuk data tabel. Jika tidak diatur, sistem akan default ke lokasi penyimpanan alur. |
partition_cols Jenis: a collection of str Koleksi opsional, misalnya, list , dari satu atau beberapa kolom yang akan digunakan untuk mempartisi tabel. |
schema Jenis: str atau StructType Definisi skema opsional untuk tabel. Skema dapat didefinisikan sebagai string SQL DDL, atau dengan Python StructType . |
temporary Jenis: bool Buat tabel tetapi jangan terbitkan metadata untuk tabel. Kata temporary kunci menginstruksikan Tabel Langsung Delta untuk membuat tabel yang tersedia untuk alur tetapi tidak boleh diakses di luar alur. Untuk mengurangi waktu pemrosesan, tabel sementara bertahan selama masa pakai alur yang membuatnya, dan bukan hanya satu pembaruan.Defaultnya adalah false. |
Definisi tabel atau tampilan |
---|
def <function-name>() Fungsi Python yang mendefinisikan himpunan data. Jika parameter name tidak diatur, maka <function-name> digunakan sebagai nama himpunan data target. |
query Pernyataan SQL Spark yang mengembalikan Spark Dataset atau Koalas DataFrame. Gunakan dlt.read() atau spark.table() untuk melakukan pembacaan lengkap dari himpunan data yang ditentukan dalam alur yang sama. Saat menggunakan fungsi spark.table() untuk membaca dari himpunan data yang ditentukan dalam alur yang sama, tambahkan kata kunci LIVE ke nama himpunan data dalam argumen fungsi. Misalnya, untuk membaca dari himpunan data bernama customers :spark.table("LIVE.customers") Anda juga dapat menggunakan fungsi spark.table() untuk membaca dari tabel yang terdaftar di metastore dengan menghilangkan kata kunci LIVE dan secara opsional mengkualifikasikan nama tabel dengan nama database:spark.table("sales.customers") Gunakan dlt.read_stream() untuk melakukan pembacaan streaming dari himpunan data yang ditentukan dalam alur yang sama.Gunakan fungsi spark.sql untuk menentukan kueri SQL guna membuat himpunan data kembali.Gunakan sintaks PySpark untuk menentukan kueri Delta Live Tables dengan Python. |
Ekspektasi |
---|
@expect("description", "constraint") Deklarasikan batasan kualitas data yang diidentifikasi oleh description . Jika baris melanggar harapan, sertakan baris dalam himpunan data target. |
@expect_or_drop("description", "constraint") Deklarasikan batasan kualitas data yang diidentifikasi oleh description . Jika baris melanggar harapan, jatuhkan baris dari himpunan data target. |
@expect_or_fail("description", "constraint") Deklarasikan batasan kualitas data yang diidentifikasi oleh description . Jika berturut-turut melanggar harapan, segera hentikan eksekusi. |
@expect_all(expectations) Deklarasikan satu atau lebih batasan kualitas data. expectations adalah kamus Python, di mana kuncinya adalah deskripsi ekspektasi dan nilainya adalah batasan ekspektasi. Jika baris melanggar salah satu harapan, sertakan baris dalam himpunan data target. |
@expect_all_or_drop(expectations) Deklarasikan satu atau lebih batasan kualitas data. expectations adalah kamus Python, di mana kuncinya adalah deskripsi ekspektasi dan nilainya adalah batasan ekspektasi. Jika baris melanggar salah satu harapan, hapus baris dari himpunan data target. |
@expect_all_or_fail(expectations) Deklarasikan satu atau lebih batasan kualitas data. expectations adalah kamus Python, di mana kuncinya adalah deskripsi ekspektasi dan nilainya adalah batasan ekspektasi. Jika baris melanggar salah satu harapan, segera hentikan eksekusi. |
Mengubah pengambilan data dengan Python di Delta Live Tables
Gunakan fungsi apply_changes()
di API Python untuk menggunakan fungsi Delta Live Tables CDC. Antarmuka Delta Live Tables Python juga menyediakan fungsi create_streaming_table(). Anda dapat menggunakan fungsi ini untuk membuat tabel target yang diperlukan oleh fungsi apply_changes()
.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Catatan
Perilaku default untuk INSERT
dan UPDATE
peristiwa adalah untuk upsert peristiwa CDC dari sumber: memperbarui baris apa pun dalam tabel target yang cocok dengan kunci yang ditentukan atau menyisipkan baris baru saat rekaman yang cocok tidak ada di tabel target. Penanganan untuk DELETE
peristiwa dapat ditentukan dengan kondisi tersebut APPLY AS DELETE WHEN
.
Penting
Anda harus mendeklarasikan tabel streaming target untuk menerapkan perubahan. Anda dapat secara opsional menentukan skema untuk tabel target Anda. Saat menentukan skema tabel target apply_changes
, Anda juga harus menyertakan kolom __START_AT
dan __END_AT
dengan jenis data yang sama dengan bidang sequence_by
.
Lihat TERAPKAN PERUBAHAN API: Menyederhanakan perubahan pengambilan data di Tabel Langsung Delta.
Argumen |
---|
target Jenis: str Nama tabel yang akan diperbarui. Anda dapat menggunakan fungsi create_streaming_table() untuk membuat tabel target sebelum menjalankan apply_changes() fungsi.Parameter ini diperlukan. |
source Jenis: str Sumber data yang berisi catatan CDC. Parameter ini diperlukan. |
keys Jenis: list Kolom atau kombinasi kolom yang secara unik mengidentifikasi baris dalam data sumber. Ini digunakan untuk mengidentifikasi peristiwa CDC mana yang berlaku untuk catatan tertentu dalam tabel target. Anda dapat menentukan: * Daftar untai (karakter): ["userId", "orderId"] * Daftar fungsi Spark SQL col() fungsi: [col("userId"), col("orderId"] Argumen untuk col() fungsi tidak dapat mencakup kualifikasi. Misalnya, Anda dapat menggunakan col(userId) , tetapi Anda tidak dapat menggunakan col(source.userId) .Parameter ini diperlukan. |
sequence_by Jenis: str atau col() Nama kolom yang menentukan urutan logis peristiwa CDC dalam data sumber. Delta Live Tables menggunakan pengurutan ini untuk menangani peristiwa perubahan yang tidak sesuai pesanan. Anda dapat menentukan: * Untai (karakter): "sequenceNum" * Spark SQL col() fungsi: col("sequenceNum") Argumen untuk col() fungsi tidak dapat mencakup kualifikasi. Misalnya, Anda dapat menggunakan col(userId) , tetapi Anda tidak dapat menggunakan col(source.userId) .Parameter ini diperlukan. |
ignore_null_updates Jenis: bool Izinkan menyerap pembaruan yang berisi subset kolom target. Ketika peristiwa CDC cocok dengan baris yang ada dan ignore_null_updates adalah True , kolom dengan null akan mempertahankan nilai yang ada di target. Ini juga berlaku untuk kolom bersarang dengan nilai null . Ketika ignore_null_updates adalah False , nilai yang ada akan ditimpa dengan null nilai.Parameter ini bersifat opsional. Default adalah False . |
apply_as_deletes Jenis: str atau expr() Menentukan kapan peristiwa CDC harus diperlakukan sebagai DELETE bukan upsert. Untuk menangani data yang tidak sesuai urutan, baris yang dihapus untuk sementara disimpan sebagai penanda di tabel Delta yang mendasarinya, dan tampilan dibuat di metastore yang menyaring penanda ini. Interval retensi dapat dikonfigurasi denganpipelines.cdc.tombstoneGCThresholdInSeconds properti tabel.Anda dapat menentukan: * Untai (karakter): "Operation = 'DELETE'" * Spark SQL expr() fungsi: expr("Operation = 'DELETE'") Parameter ini bersifat opsional. |
apply_as_truncates Jenis: str atau expr() Menentukan kapan acara CDC harus diperlakukan sebagai tabel lengkap TRUNCATE . Karena klausa ini memicu pemotongan penuh tabel target, klausa ini harus digunakan hanya untuk kasus penggunaan tertentu yang memerlukan fungsionalitas ini.Parameter apply_as_truncates hanya didukung untuk SCD jenis 1. SCD jenis 2 tidak mendukung pemotongan.Anda dapat menentukan: * Untai (karakter): "Operation = 'TRUNCATE'" * Spark SQL expr() fungsi: expr("Operation = 'TRUNCATE'") Parameter ini bersifat opsional. |
column_list except_column_list Jenis: list Subset kolom untuk disertakan dalam tabel target. Gunakan column_list untuk menentukan daftar lengkap kolom untuk disertakan. Gunakan except_column_list untuk menentukan kolom yang akan dikecualikan. Anda dapat mendeklarasikan nilai sebagai daftar string atau sebagai fungsi Spark SQL col() :* column_list = ["userId", "name", "city"] .* column_list = [col("userId"), col("name"), col("city")] * except_column_list = ["operation", "sequenceNum"] * except_column_list = [col("operation"), col("sequenceNum") Argumen untuk col() fungsi tidak dapat mencakup kualifikasi. Misalnya, Anda dapat menggunakan col(userId) , tetapi Anda tidak dapat menggunakan col(source.userId) .Parameter ini bersifat opsional. Defaultnya adalah memasukkan semua kolom dalam tabel target saat tidak ada column_list atau except_column_list argumen diteruskan ke fungsi. |
stored_as_scd_type Jenis: str atau int Apakah Anda akan menyimpan rekaman sebagai SCD jenis 1 atau SCD jenis 2. Atur ke 1 untuk SCD jenis 1 atau 2 untuk SCD jenis 2.Klausa ini opsional. Defaultnya adalah SCD jenis 1. |
track_history_column_list track_history_except_column_list Jenis: list Subset kolom output yang akan dilacak untuk riwayat dalam tabel target. Gunakan track_history_column_list untuk menentukan daftar lengkap kolom yang akan dilacak. Menggunakantrack_history_except_column_list untuk menentukan kolom yang akan dikecualikan dari pelacakan. Anda dapat mendeklarasikan nilai sebagai daftar string atau sebagai fungsi Spark SQL col() : - track_history_column_list = ["userId", "name", "city"] . - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Argumen untuk col() fungsi tidak dapat mencakup kualifikasi. Misalnya, Anda dapat menggunakan col(userId) , tetapi Anda tidak dapat menggunakan col(source.userId) .Parameter ini bersifat opsional. Defaultnya adalah menyertakan semua kolom dalam tabel target saat tidak ada track_history_column_list atautrack_history_except_column_list argumen diteruskan ke fungsi. |
Saran dan Komentar
https://aka.ms/ContentUserFeedback.
Segera hadir: Sepanjang tahun 2024 kami akan menghentikan penggunaan GitHub Issues sebagai mekanisme umpan balik untuk konten dan menggantinya dengan sistem umpan balik baru. Untuk mengetahui informasi selengkapnya, lihat:Kirim dan lihat umpan balik untuk