Referensi bahasa Python Delta Live Tables

Artikel ini menyediakan detail untuk antarmuka pemrograman Python Tabel Langsung Delta.

Untuk informasi tentang API SQL, lihat referensi bahasa SQL Delta Live Tables.

Untuk detail khusus untuk mengonfigurasi Auto Loader, lihat Apa itu Auto Loader?.

Batasan

Antarmuka Python Tabel Langsung Delta memiliki batasan berikut:

  • Python table dan view fungsi harus mengembalikan DataFrame. Beberapa fungsi yang beroperasi pada DataFrames tidak mengembalikan DataFrames dan tidak boleh digunakan. Karena transformasi DataFrame dijalankan setelah grafik aliran data lengkap diselesaikan, menggunakan operasi tersebut mungkin memiliki efek samping yang tidak diinginkan. Operasi ini mencakup fungsi seperti collect(), , count(), toPandas()save(), dan saveAsTable(). Namun, Anda dapat menyertakan fungsi-fungsi ini di luar table definisi fungsi atau view karena kode ini dijalankan sekali selama fase inisialisasi grafik.
  • Fungsi pivot() tidak didukung. pivot Operasi di Spark memerlukan pemuatan data input yang bersemangat untuk menghitung skema output. Kemampuan ini tidak didukung dalam Tabel Langsung Delta.

dlt Mengimpor modul Python

Delta Live Tables Fungsi Python didefinisikan dalam modul dlt. Alur Anda yang diimplementasikan dengan API Python harus mengimpor modul ini:

import dlt

Membuat tampilan terwujud atau tabel streaming Tabel Langsung Delta

Di Python, Tabel Langsung Delta menentukan apakah akan memperbarui himpunan data sebagai tampilan materialisasi atau tabel streaming berdasarkan kueri yang menentukan. Dekorator @table digunakan untuk menentukan tampilan materialisasi dan tabel streaming.

Untuk menentukan tampilan materialisasi di Python, terapkan @table ke kueri yang melakukan baca statis terhadap sumber data. Untuk menentukan tabel streaming, terapkan @table ke kueri yang melakukan pembacaan streaming terhadap sumber data. Kedua jenis himpunan data memiliki spesifikasi sintaks yang sama seperti berikut:

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Membuat tampilan Tabel Langsung Delta

Untuk menentukan tampilan dengan Python, terapkan dekorator @view. Seperti dekorator @table , Anda dapat menggunakan tampilan di Tabel Langsung Delta untuk himpunan data statis atau streaming. Berikut ini adalah sintaks untuk menentukan tampilan dengan Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Contoh: Menentukan tabel dan tampilan

Untuk menentukan tabel atau tampilan di Python, terapkan @dlt.view atau @dlt.table dekorator ke fungsi. Anda dapat menggunakan nama fungsi atau parameter name untuk menetapkan nama tabel atau tampilan. Contoh berikut mendefinisikan dua himpunan data yang berbeda: tampilan bernama taxi_raw yang menggunakan file JSON sebagai sumber input dan tabel bernama filtered_data yang menggunakan tampilan taxi_raw sebagai input:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

Contoh: Mengakses himpunan data yang ditentukan dalam alur yang sama

Selain membaca dari sumber data eksternal, Anda dapat mengakses himpunan data yang ditentukan dalam alur yang sama dengan fungsi Tabel read() Langsung Delta. Contoh berikut menunjukkan pembuatan himpunan customers_filteredread() data menggunakan fungsi :

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

Anda juga dapat menggunakan spark.table() fungsi untuk mengakses himpunan data yang ditentukan dalam alur yang sama. Saat menggunakan fungsi spark.table() untuk mengakses himpunan data yang ditentukan dalam alur, dalam argumen fungsi tambahkan kata kunci LIVE ke nama himpunan data:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

Contoh: Membaca dari tabel yang terdaftar di metastore

Untuk membaca data dari tabel yang terdaftar di metastore Apache Hive, dalam argumen fungsi hilangkan LIVE kata kunci dan secara opsional memenuhi syarat nama tabel dengan nama database:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Untuk contoh membaca dari tabel Katalog Unity, lihat Menyerap data ke dalam alur Katalog Unity.

Contoh: Mengakses himpunan data menggunakan spark.sql

Anda juga dapat mengembalikan himpunan data menggunakan ekspresi spark.sql dalam fungsi kueri. Untuk membaca dari himpunan data internal, tambahkan LIVE. ke nama himpunan data:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Membuat tabel untuk digunakan sebagai target operasi streaming

create_streaming_table() Gunakan fungsi untuk membuat tabel target untuk output rekaman berdasarkan operasi streaming, termasuk apply_changes() dan rekaman output @append_flow.

Catatan

Fungsi create_target_table() dan create_streaming_live_table() tidak digunakan lagi. Databricks merekomendasikan pembaruan kode yang telah ada untuk menggunakan fungsi create_streaming_table().

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"}
)
Argumen
name

Jenis: str

Nama tabel.

Parameter ini diperlukan.
comment

Jenis: str

Deskripsi opsional untuk tabel.
spark_conf

Jenis: dict

Daftar opsional konfigurasi Spark untuk eksekusi kueri ini.
table_properties

Jenis: dict

Daftar opsional properti tabel untuk tabel.
partition_cols

Jenis: array

Daftar opsional dari satu atau beberapa kolom yang digunakan untuk mempartisi tabel.
path

Jenis: str

Lokasi penyimpanan opsional untuk data tabel. Jika tidak diatur, sistem akan default ke lokasi penyimpanan alur.
schema

Jenis: str atau StructType

Definisi skema opsional untuk tabel. Skema dapat didefinisikan sebagai string SQL DDL, atau dengan Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Jenis: dict

Batasan kualitas data opsional untuk tabel. Lihat beberapa ekspektasi.

Mengontrol bagaimana tabel diwujudkan

Tabel juga menawarkan kontrol tambahan atas materialisasinya:

  • Tentukan bagaimana tabel dipartisi menggunakan partition_cols. Anda dapat menggunakan partisi untuk mempercepat kueri.
  • Anda dapat mengatur properti tabel saat Anda menentukan tampilan atau tabel. Lihat Properti tabel Tabel Langsung Delta.
  • Atur lokasi penyimpanan untuk data tabel menggunakan pengaturan path. Secara default, data tabel disimpan di lokasi penyimpanan alur jika path tidak diatur.
  • Anda dapat menggunakan kolom yang dihasilkan dalam definisi skema Anda. Lihat Contoh: Menentukan skema dan kolom partisi.

Catatan

Untuk tabel berukuran kurang dari 1 TB, Databricks merekomendasikan untuk mengizinkan Delta Live Table mengontrol organisasi data. Kecuali Anda mengharapkan tabel Anda tumbuh di luar terabyte, Anda umumnya tidak boleh menentukan kolom partisi.

Contoh: Tentukan skema dan kolom partisi

Anda dapat secara opsional menentukan skema tabel menggunakan Python StructType atau untai (karakter) SQL DDL. Ketika ditentukan dengan string DDL, definisi dapat menyertakan kolom yang dihasilkan.

Contoh berikut membuat tabel yang disebut sales dengan skema yang ditentukan menggunakan Python StructType:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

Contoh berikut menentukan skema untuk tabel menggunakan string DDL, menentukan kolom yang dihasilkan, dan menentukan kolom partisi:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Secara default, Delta Live Tables menyimpulkan skema dari definisi table jika Anda tidak menentukan skema.

Mengonfigurasi tabel streaming untuk mengabaikan perubahan dalam tabel streaming sumber

Catatan

  • skipChangeCommits Bendera hanya berfungsi dengan spark.readStream menggunakan option() fungsi . Anda tidak dapat menggunakan bendera ini dalam fungsi dlt.read_stream() .
  • Anda tidak dapat menggunakan skipChangeCommits bendera saat tabel streaming sumber didefinisikan sebagai target fungsi apply_changes().

Secara default, tabel streaming memerlukan sumber khusus tambahan. Saat tabel streaming menggunakan tabel streaming lain sebagai sumber, dan tabel streaming sumber memerlukan pembaruan atau penghapusan, misalnya, pemrosesan GDPR "hak untuk dilupakan", skipChangeCommits bendera dapat diatur saat membaca tabel streaming sumber untuk mengabaikan perubahan tersebut. Untuk informasi selengkapnya tentang bendera ini, lihat Mengabaikan pembaruan dan penghapusan.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Properti Tabel Langsung Python Delta

Tabel berikut ini menjelaskan opsi dan properti yang bisa Anda tentukan saat menentukan tabel dan tampilan dengan Tabel Langsung Delta:

@table atau @view
name

Jenis: str

Nama opsional untuk tabel atau tampilan. Jika tidak ditentukan, nama fungsi digunakan sebagai nama tabel atau tampilan.
comment

Jenis: str

Deskripsi opsional untuk tabel.
spark_conf

Jenis: dict

Daftar opsional konfigurasi Spark untuk eksekusi kueri ini.
table_properties

Jenis: dict

Daftar opsional properti tabel untuk tabel.
path

Jenis: str

Lokasi penyimpanan opsional untuk data tabel. Jika tidak diatur, sistem akan default ke lokasi penyimpanan alur.
partition_cols

Jenis: a collection of str

Koleksi opsional, misalnya, list, dari satu atau beberapa kolom yang akan digunakan untuk mempartisi tabel.
schema

Jenis: str atau StructType

Definisi skema opsional untuk tabel. Skema dapat didefinisikan sebagai string SQL DDL, atau dengan Python
StructType.
temporary

Jenis: bool

Buat tabel tetapi jangan terbitkan metadata untuk tabel. Kata temporary kunci menginstruksikan Tabel Langsung Delta untuk membuat tabel yang tersedia untuk alur tetapi tidak boleh diakses di luar alur. Untuk mengurangi waktu pemrosesan, tabel sementara bertahan selama masa pakai alur yang membuatnya, dan bukan hanya satu pembaruan.

Defaultnya adalah false.
Definisi tabel atau tampilan
def <function-name>()

Fungsi Python yang mendefinisikan himpunan data. Jika parameter name tidak diatur, maka <function-name> digunakan sebagai nama himpunan data target.
query

Pernyataan SQL Spark yang mengembalikan Spark Dataset atau Koalas DataFrame.

Gunakan dlt.read() atau spark.table() untuk melakukan pembacaan lengkap dari himpunan data yang ditentukan dalam alur yang sama. Saat menggunakan fungsi spark.table() untuk membaca dari himpunan data yang ditentukan dalam alur yang sama, tambahkan kata kunci LIVE ke nama himpunan data dalam argumen fungsi. Misalnya, untuk membaca dari himpunan data bernama customers:

spark.table("LIVE.customers")

Anda juga dapat menggunakan fungsi spark.table() untuk membaca dari tabel yang terdaftar di metastore dengan menghilangkan kata kunci LIVE dan secara opsional mengkualifikasikan nama tabel dengan nama database:

spark.table("sales.customers")

Gunakan dlt.read_stream() untuk melakukan pembacaan streaming dari himpunan data yang ditentukan dalam alur yang sama.

Gunakan fungsi spark.sql untuk menentukan kueri SQL guna membuat himpunan data kembali.

Gunakan sintaks PySpark untuk menentukan kueri Delta Live Tables dengan Python.
Ekspektasi
@expect("description", "constraint")

Deklarasikan batasan kualitas data yang diidentifikasi oleh
description. Jika baris melanggar harapan, sertakan baris dalam himpunan data target.
@expect_or_drop("description", "constraint")

Deklarasikan batasan kualitas data yang diidentifikasi oleh
description. Jika baris melanggar harapan, jatuhkan baris dari himpunan data target.
@expect_or_fail("description", "constraint")

Deklarasikan batasan kualitas data yang diidentifikasi oleh
description. Jika berturut-turut melanggar harapan, segera hentikan eksekusi.
@expect_all(expectations)

Deklarasikan satu atau lebih batasan kualitas data.
expectations adalah kamus Python, di mana kuncinya adalah deskripsi ekspektasi dan nilainya adalah batasan ekspektasi. Jika baris melanggar salah satu harapan, sertakan baris dalam himpunan data target.
@expect_all_or_drop(expectations)

Deklarasikan satu atau lebih batasan kualitas data.
expectations adalah kamus Python, di mana kuncinya adalah deskripsi ekspektasi dan nilainya adalah batasan ekspektasi. Jika baris melanggar salah satu harapan, hapus baris dari himpunan data target.
@expect_all_or_fail(expectations)

Deklarasikan satu atau lebih batasan kualitas data.
expectations adalah kamus Python, di mana kuncinya adalah deskripsi ekspektasi dan nilainya adalah batasan ekspektasi. Jika baris melanggar salah satu harapan, segera hentikan eksekusi.

Mengubah pengambilan data dengan Python di Delta Live Tables

Gunakan fungsi apply_changes() di API Python untuk menggunakan fungsi Delta Live Tables CDC. Antarmuka Delta Live Tables Python juga menyediakan fungsi create_streaming_table(). Anda dapat menggunakan fungsi ini untuk membuat tabel target yang diperlukan oleh fungsi apply_changes().

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Catatan

Perilaku default untuk INSERT dan UPDATE peristiwa adalah untuk upsert peristiwa CDC dari sumber: memperbarui baris apa pun dalam tabel target yang cocok dengan kunci yang ditentukan atau menyisipkan baris baru saat rekaman yang cocok tidak ada di tabel target. Penanganan untuk DELETE peristiwa dapat ditentukan dengan kondisi tersebut APPLY AS DELETE WHEN.

Penting

Anda harus mendeklarasikan tabel streaming target untuk menerapkan perubahan. Anda dapat secara opsional menentukan skema untuk tabel target Anda. Saat menentukan skema tabel target apply_changes, Anda juga harus menyertakan kolom __START_AT dan __END_AT dengan jenis data yang sama dengan bidang sequence_by.

Lihat TERAPKAN PERUBAHAN API: Menyederhanakan perubahan pengambilan data di Tabel Langsung Delta.

Argumen
target

Jenis: str

Nama tabel yang akan diperbarui. Anda dapat menggunakan fungsi create_streaming_table() untuk membuat tabel target sebelum menjalankan apply_changes() fungsi.

Parameter ini diperlukan.
source

Jenis: str

Sumber data yang berisi catatan CDC.

Parameter ini diperlukan.
keys

Jenis: list

Kolom atau kombinasi kolom yang secara unik mengidentifikasi baris dalam data sumber. Ini digunakan untuk mengidentifikasi peristiwa CDC mana yang berlaku untuk catatan tertentu dalam tabel target.

Anda dapat menentukan:

* Daftar untai (karakter): ["userId", "orderId"]
* Daftar fungsi Spark SQLcol() fungsi: [col("userId"), col("orderId"]

Argumen untuk col() fungsi tidak dapat mencakup kualifikasi. Misalnya, Anda dapat menggunakan col(userId), tetapi Anda tidak dapat menggunakan col(source.userId).

Parameter ini diperlukan.
sequence_by

Jenis: str atau col()

Nama kolom yang menentukan urutan logis peristiwa CDC dalam data sumber. Delta Live Tables menggunakan pengurutan ini untuk menangani peristiwa perubahan yang tidak sesuai pesanan.

Anda dapat menentukan:

* Untai (karakter): "sequenceNum"
* Spark SQL col() fungsi: col("sequenceNum")

Argumen untuk col() fungsi tidak dapat mencakup kualifikasi. Misalnya, Anda dapat menggunakan col(userId), tetapi Anda tidak dapat menggunakan col(source.userId).

Parameter ini diperlukan.
ignore_null_updates

Jenis: bool

Izinkan menyerap pembaruan yang berisi subset kolom target. Ketika peristiwa CDC cocok dengan baris yang ada dan ignore_null_updates adalah True, kolom dengan null akan mempertahankan nilai yang ada di target. Ini juga berlaku untuk kolom bersarang dengan nilai null. Ketika ignore_null_updates adalah False, nilai yang ada akan ditimpa dengan null nilai.

Parameter ini bersifat opsional.

Default adalah False.
apply_as_deletes

Jenis: str atau expr()

Menentukan kapan peristiwa CDC harus diperlakukan sebagai DELETE bukan upsert. Untuk menangani data yang tidak sesuai urutan, baris yang dihapus untuk sementara disimpan sebagai penanda di tabel Delta yang mendasarinya, dan tampilan dibuat di metastore yang menyaring penanda ini. Interval retensi dapat dikonfigurasi dengan
pipelines.cdc.tombstoneGCThresholdInSecondsproperti tabel.

Anda dapat menentukan:

* Untai (karakter): "Operation = 'DELETE'"
* Spark SQL expr() fungsi: expr("Operation = 'DELETE'")

Parameter ini bersifat opsional.
apply_as_truncates

Jenis: str atau expr()

Menentukan kapan acara CDC harus diperlakukan sebagai tabel lengkap TRUNCATE. Karena klausa ini memicu pemotongan penuh tabel target, klausa ini harus digunakan hanya untuk kasus penggunaan tertentu yang memerlukan fungsionalitas ini.

Parameter apply_as_truncates hanya didukung untuk SCD jenis 1. SCD jenis 2 tidak mendukung pemotongan.

Anda dapat menentukan:

* Untai (karakter): "Operation = 'TRUNCATE'"
* Spark SQL expr() fungsi: expr("Operation = 'TRUNCATE'")

Parameter ini bersifat opsional.
column_list

except_column_list

Jenis: list

Subset kolom untuk disertakan dalam tabel target. Gunakan column_list untuk menentukan daftar lengkap kolom untuk disertakan. Gunakan except_column_list untuk menentukan kolom yang akan dikecualikan. Anda dapat mendeklarasikan nilai sebagai daftar string atau sebagai fungsi Spark SQL col():

* column_list = ["userId", "name", "city"].
* column_list = [col("userId"), col("name"), col("city")]
* except_column_list = ["operation", "sequenceNum"]
* except_column_list = [col("operation"), col("sequenceNum")

Argumen untuk col() fungsi tidak dapat mencakup kualifikasi. Misalnya, Anda dapat menggunakan col(userId), tetapi Anda tidak dapat menggunakan col(source.userId).

Parameter ini bersifat opsional.

Defaultnya adalah memasukkan semua kolom dalam tabel target saat tidak ada column_list atau except_column_list argumen diteruskan ke fungsi.
stored_as_scd_type

Jenis: str atau int

Apakah Anda akan menyimpan rekaman sebagai SCD jenis 1 atau SCD jenis 2.

Atur ke 1 untuk SCD jenis 1 atau 2 untuk SCD jenis 2.

Klausa ini opsional.

Defaultnya adalah SCD jenis 1.
track_history_column_list

track_history_except_column_list

Jenis: list

Subset kolom output yang akan dilacak untuk riwayat dalam tabel target. Gunakan track_history_column_list untuk menentukan daftar lengkap kolom yang akan dilacak. Menggunakan
track_history_except_column_list untuk menentukan kolom yang akan dikecualikan dari pelacakan. Anda dapat mendeklarasikan nilai sebagai daftar string atau sebagai fungsi Spark SQL col() : - track_history_column_list = ["userId", "name", "city"]. - track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumen untuk col() fungsi tidak dapat mencakup kualifikasi. Misalnya, Anda dapat menggunakan col(userId), tetapi Anda tidak dapat menggunakan col(source.userId).

Parameter ini bersifat opsional.

Defaultnya adalah menyertakan semua kolom dalam tabel target saat tidak ada track_history_column_list atau
track_history_except_column_list argumen diteruskan ke fungsi.