Bagikan melalui


Pembacaan dan penulisan streaming tabel Delta

Delta Lake sangat terintegrasi dengan Spark Structured Streaming melalui readStream dan writeStream. Delta Lake mengatasi banyak keterbatasan yang biasanya terkait dengan sistem dan file streaming, termasuk:

  • Menyatukan file kecil yang dihasilkan oleh penyerapan latensi rendah.
  • Mempertahankan pemrosesan "persis sekali" dengan lebih dari satu aliran (atau pekerjaan batch bersamaan).
  • Menemukan file mana yang baru secara efisien saat menggunakan file sebagai sumber untuk aliran.

Catatan

Artikel ini menjelaskan penggunaan tabel Delta Lake sebagai sumber streaming dan sink. Untuk mempelajari cara memuat data menggunakan tabel streaming di Databricks SQL, lihat Memuat data menggunakan tabel streaming di Databricks SQL.

Untuk informasi tentang gabungan statis aliran dengan Delta Lake, lihat Gabungan statis aliran.

Tabel Delta sebagai sumber

Streaming Terstruktur secara bertahap membaca tabel Delta. Saat kueri streaming aktif terhadap tabel Delta, rekaman baru diproses secara idempotensi saat versi tabel baru diterapkan ke tabel sumber.

Contoh kode berikut menunjukkan mengonfigurasi bacaan streaming menggunakan nama tabel atau jalur file.

Python

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Scala

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Penting

Jika skema untuk tabel Delta berubah setelah pembacaan streaming dimulai terhadap tabel, kueri gagal. Untuk sebagian besar perubahan skema, Anda dapat memulai ulang stream untuk mengatasi ketidakcocokan skema dan melanjutkan pemrosesan.

Di Databricks Runtime 12.2 LTS dan di bawahnya, Anda tidak dapat melakukan streaming dari tabel Delta dengan pemetaan kolom diaktifkan yang telah mengalami evolusi skema non-aditif seperti mengganti nama atau menghilangkan kolom. Untuk detailnya, lihat Streaming dengan pemetaan kolom dan perubahan skema.

Batasi laju input

Opsi berikut tersedia untuk mengontrol batch mikro:

  • maxFilesPerTrigger: Jumlah file baru yang akan dipertimbangkan di setiap batch mikro. Defaultnya adalah 1000.
  • maxBytesPerTrigger: Jumlah data yang diproses di setiap batch mikro. Opsi ini menetapkan "maks lunak", yang berarti batch memproses kira-kira sejumlah data ini dan dapat memproses lebih dari batas untuk membuat kueri streaming bergerak maju, dalam kasus ketika unit input terkecil lebih besar dari batas ini. Ini tidak diatur secara default.

Jika Anda menggunakan maxBytesPerTrigger bersama dengan maxFilesPerTrigger, batch mikro memproses data hingga batas maxFilesPerTrigger atau maxBytesPerTrigger tercapai.

Catatan

Dalam kasus ketika transaksi tabel sumber dibersihkan karena logRetentionDuration konfigurasi dan kueri streaming mencoba memproses versi tersebut, secara default kueri gagal menghindari kehilangan data. Anda dapat mengatur opsi failOnDataLoss untuk false mengabaikan data yang hilang dan melanjutkan pemrosesan.

Mengalirkan umpan pengambilan data perubahan Delta Lake (CDC)

Delta Lake mengubah rekaman umpan data berubah menjadi tabel Delta, termasuk pembaruan dan penghapusan. Saat diaktifkan, Anda dapat melakukan streaming dari umpan data perubahan dan menulis logika untuk memproses penyisipan, pembaruan, dan penghapusan ke dalam tabel hilir. Meskipun mengubah output data umpan data sedikit berbeda dari tabel Delta yang dijelaskannya, ini menyediakan solusi untuk menyebarkan perubahan bertahap pada tabel hilir dalam arsitektur medali.

Penting

Di Databricks Runtime 12.2 LTS ke bawah, Anda tidak dapat melakukan streaming dari umpan data perubahan untuk tabel Delta dengan pemetaan kolom diaktifkan yang telah mengalami evolusi skema non-aditif seperti mengganti nama atau menghilangkan kolom. Untuk detailnya, lihat Streaming dengan pemetaan kolom dan perubahan skema.

Mengabaikan pembaruan dan penghapusan

Streaming Terstruktur tidak menangani input yang bukan penambahan dan memberikan pengecualian jika ada modifikasi yang terjadi pada tabel yang digunakan sebagai sumber. Ada dua strategi utama untuk menangani perubahan yang tidak dapat secara otomatis disebarkan ke hilir:

  • Anda dapat menghapus output dan titik pemeriksaan dan memulai ulang aliran dari awal.
  • Anda dapat mengatur salah satu dari dua opsi ini:
    • ignoreDeletes: mengabaikan transaksi yang menghapus data pada batas partisi.
    • skipChangeCommits: abaikan transaksi yang menghapus atau memodifikasi rekaman yang ada. skipChangeCommits subsum ignoreDeletes.

Catatan

Di Databricks Runtime 12.2 LTS ke atas, skipChangeCommits hentikan pengaturan ignoreChangessebelumnya . Di Databricks Runtime 11.3 LTS dan yang lebih rendah, ignoreChanges adalah satu-satunya opsi yang didukung.

Semantik untuk ignoreChanges sangat berbeda dari skipChangeCommits. Dengan ignoreChanges diaktifkan, file data yang ditulis ulang dalam tabel sumber dipancarkan kembali setelah operasi perubahan data seperti UPDATE, MERGE INTO, DELETE (dalam partisi), atau OVERWRITE. Baris yang tidak berubah sering dipancarkan bersama baris baru, jadi konsumen hilir harus dapat menangani duplikat. Penghapusan tidak disebarkan ke hilir. ignoreChanges subsum ignoreDeletes.

skipChangeCommits mengabaikan seluruh operasi pengubahan file. File data yang ditulis ulang dalam tabel sumber karena operasi perubahan data seperti UPDATE, MERGE INTO, DELETE, dan OVERWRITE seluruhnya diabaikan. Untuk mencerminkan perubahan dalam tabel sumber upstream, Anda harus menerapkan logika terpisah untuk menyebarkan perubahan ini.

Beban kerja yang dikonfigurasi dengan ignoreChanges terus beroperasi menggunakan semantik yang diketahui, tetapi Databricks merekomendasikan penggunaan skipChangeCommits untuk semua beban kerja baru. Memigrasikan beban kerja menggunakan ignoreChanges untuk skipChangeCommits memerlukan logika refaktor.

Contoh

Contohnya, misalnya Anda memiliki tabel user_events dengan date, user_email, dan action yang dipartisi oleh date. Anda melakukan streaming keluar dari tabel user_events dan Anda perlu menghapus data dari tabel tersebut karena GDPR.

Jika Anda menghapus pada batas-batas partisi (yaitu, WHERE ada di kolom partisi), file sudah tersegmentasi berdasarkan nilai sehingga penghapusan hanya menghilangkan file tersebut dari metadata. Saat menghapus seluruh partisi data, Anda bisa menggunakan yang berikut ini:

spark.readStream
  .option("ignoreDeletes", "true")
  .table("user_events")

Jika Anda menghapus data dalam beberapa partisi (dalam contoh ini, pemfilteran pada user_email), gunakan sintaks berikut:

spark.readStream
  .option("skipChangeCommits", "true")
  .table("user_events")

Jika Anda memperbarui dengan user_email pernyataan UPDATE, file berisi user_email yang dimaksud akan ditulis ulang. Gunakan skipChangeCommits untuk mengabaikan file data yang diubah.

Tentukan posisi awal

Anda dapat menggunakan opsi berikut untuk menentukan titik awal sumber streaming Delta Lake tanpa memproses seluruh tabel.

  • startingVersion: Versi Delta Lake untuk memulai. Databricks merekomendasikan untuk menghilangkan opsi ini untuk sebagian besar beban kerja. Saat tidak diatur, streaming dimulai dari versi terbaru yang tersedia termasuk rekam jepret lengkap tabel pada saat itu.

    Jika ditentukan, aliran membaca semua perubahan pada tabel Delta yang dimulai dengan versi yang ditentukan (inklusif). Jika versi yang ditentukan tidak lagi tersedia, aliran gagal dimulai. Anda dapat memperoleh versi commit dari kolom version pada output perintah DESCRIBE HISTORY.

    Untuk mengembalikan hanya perubahan terbaru, tentukan latest.

  • startingTimestamp: Stempel waktu untuk memulai. Semua perubahan tabel yang dilakukan pada atau setelah tanda waktu (inklusif) dibaca oleh pembaca streaming. Jika tanda waktu yang disediakan mendahului semua penerapan tabel, bacaan streaming dimulai dengan tanda waktu paling awal yang tersedia. Salah satu:

    • String stempel waktu. Contohnya,"2019-01-01T00:00:00.000Z".
    • String tanggal. Contohnya,"2019-01-01".

Anda tidak dapat mengatur kedua opsi secara bersamaan. Kedua opsi tersebut berlaku hanya ketika memulai kueri streaming baru. Jika kueri streaming telah dimulai dan kemajuan telah dicatat di titik pemeriksaannya, opsi ini akan diabaikan.

Penting

Meskipun Anda dapat memulai sumber streaming dari versi atau stempel waktu tertentu, skema sumber streaming selalu merupakan skema terbaru dari tabel Delta. Anda harus memastikan tidak ada perubahan skema yang tidak kompatibel dengan tabel Delta setelah versi atau stempel waktu yang ditentukan. Jika tidak, sumber streaming dapat mengembalikan hasil yang salah saat membaca data dengan skema yang salah.

Contoh

Sebagai contoh, misalnya Anda memiliki tabel user_events. Jika Anda ingin membaca perubahan sejak versi 5, gunakan:

spark.readStream
  .option("startingVersion", "5")
  .table("user_events")

Jika Anda ingin membaca perubahan sejak versi 2018-10-18, gunakan:

spark.readStream
  .option("startingTimestamp", "2018-10-18")
  .table("user_events")

Proses snapshot awal tanpa data dihilangkan

Catatan

Fitur ini tersedia di Databricks Runtime 11.3 LTS ke atas. Fitur ini ada di Pratinjau Publik.

Saat menggunakan tabel Delta sebagai sumber aliran, kueri terlebih dahulu memproses semua data yang ada dalam tabel. Tabel Delta pada versi ini disebut snapshot awal. Secara default, file data tabel Delta diproses berdasarkan file mana yang terakhir diubah. Namun, waktu modifikasi terakhir tidak selalu mewakili urutan waktu peristiwa rekaman.

Dalam kueri streaming stateful dengan marka air yang ditentukan, memproses file dengan waktu modifikasi dapat mengakibatkan rekaman diproses dalam urutan yang salah. Ini dapat menyebabkan rekaman menurun sebagai peristiwa terlambat oleh marka air.

Anda dapat menghindari masalah penurunan data dengan mengaktifkan opsi berikut:

  • withEventTimeOrder: Apakah snapshot awal harus diproses dengan urutan waktu peristiwa.

Dengan urutan waktu peristiwa diaktifkan, rentang waktu peristiwa data snapshot awal dibagi menjadi wadah waktu. Setiap batch mikro memproses wadah dengan memfilter data dalam rentang waktu. Opsi konfigurasi maxFilesPerTrigger dan maxBytesPerTrigger masih berlaku untuk mengontrol ukuran microbatch tetapi hanya dengan cara yang diperkirakan karena sifat pemrosesan.

Grafik di bawah ini menunjukkan proses ini:

Snapshot Awal

Informasi penting tentang fitur ini:

  • Masalah penurunan data hanya terjadi ketika rekam jepret Delta awal dari kueri streaming stateful diproses dalam urutan default.
  • Anda tidak dapat mengubah withEventTimeOrder setelah kueri aliran dimulai saat snapshot awal masih diproses. Untuk memulai ulang dengan perubahan withEventTimeOrder, Anda perlu menghapus titik pemeriksaan.
  • Jika Anda menjalankan kueri aliran dengan withEventTimeOrder diaktifkan, Anda tidak dapat menurunkannya ke versi DBR yang tidak mendukung fitur ini hingga pemrosesan snapshot awal selesai. Jika Anda perlu menurunkan tingkat, Anda dapat menunggu snapshot awal selesai, atau menghapus titik pemeriksaan dan memulai ulang kueri.
  • Fitur ini tidak didukung dalam skenario yang jarang berikut:
    • Kolom waktu peristiwa adalah kolom yang dihasilkan dan ada transformasi non-proyeksi antara sumber Delta dan marka air.
    • Ada marka air yang memiliki lebih dari satu sumber Delta dalam kueri aliran.
  • Dengan urutan waktu peristiwa diaktifkan, performa pemrosesan snapshot awal Delta mungkin lebih lambat.
  • Setiap batch mikro memindai snapshot awal untuk memfilter data dalam rentang waktu peristiwa yang sesuai. Untuk tindakan filter yang lebih cepat, disarankan untuk menggunakan kolom sumber Delta sebagai waktu kejadian sehingga lompati data dapat diterapkan (periksa Data yang melompati Delta Lake saat berlaku). Selain itu, pemartisian tabel di sepanjang kolom waktu peristiwa dapat mempercepat pemrosesan lebih lanjut. Anda dapat memeriksa Spark UI untuk melihat berapa banyak file delta yang dipindai untuk batch mikro tertentu.

Contoh

Misalkan Anda memiliki tabel user_events dengan kolom event_time. Kueri streaming Anda adalah kueri agregasi. Jika Anda ingin memastikan tidak ada penurunan data selama pemrosesan snapshot awal, Anda dapat menggunakan:

spark.readStream
  .option("withEventTimeOrder", "true")
  .table("user_events")
  .withWatermark("event_time", "10 seconds")

Catatan

Anda juga dapat mengaktifkan ini dengan konfigurasi Spark pada kluster yang akan berlaku untuk semua kueri streaming: spark.databricks.delta.withEventTimeOrder.enabled true

Tabel Delta sebagai sink

Anda juga dapat menulis ke tabel Delta menggunakan Structured Streaming. Log transaksi memungkinkan Delta Lake untuk menjamin pemrosesan tepat satu kali, bahkan ketika terdapat aliran atau kueri batch lain yang berjalan bersamaan terhadap tabel.

Catatan

Fungsi Delta Lake VACUUM menghapus semua file yang tidak dikelola oleh Delta Lake tetapi melewati semua direktori yang dimulai dengan _. Anda dapat dengan aman menyimpan titik pemeriksaan bersama data dan metadata lain untuk tabel Delta menggunakan struktur direktori seperti <table-name>/_checkpoints.ggunakan struktur direktori seperti <table-name>/_checkpoints.

Metrik

Anda dapat mengetahui jumlah byte dan file yang belum diproses dalam proses kueri streaming sebagai metrik numBytesOutstanding dan numFilesOutstanding. Metrik tambahan meliputi:

  • numNewListedFiles: Jumlah file Delta Lake yang tercantum untuk menghitung backlog untuk batch ini.
    • backlogEndOffset: Versi tabel yang digunakan untuk menghitung backlog.

Jika Anda menjalankan streaming di buku catatan, Anda bisa melihat metrik ini di bawah tab Data Mentah di dasbor kemajuan kueri streaming:

{
  "sources" : [
    {
      "description" : "DeltaSource[file:/path/to/source]",
      "metrics" : {
        "numBytesOutstanding" : "3456",
        "numFilesOutstanding" : "8"
      },
    }
  ]
}

Mode penambahan

Secara default, aliran berjalan dalam mode penambahan, yang menambahkan rekaman baru ke tabel.

toTable Gunakan metode saat streaming ke tabel, seperti dalam contoh berikut:

Python

(events.writeStream
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)

Scala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

Mode lengkap

Anda juga dapat menggunakan Structured Streaming untuk mengganti seluruh tabel dengan setiap batch. Satu contoh kasus penggunaan adalah melakukan komputasi ringkasan menggunakan agregasi:

Python

(spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")
)

Scala

spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")

Contoh sebelumnya terus memperbarui tabel yang berisi jumlah total peristiwa oleh pelanggan.

Untuk aplikasi dengan persyaratan latensi yang lebih lunak, Anda dapat menghemat sumber daya komputasi dengan pemicu satu kali. Gunakan ini untuk memperbarui tabel agregasi ringkasan pada jadwal tertentu, yang hanya memproses data baru yang telah tiba sejak pembaruan terakhir.

Upsert dari kueri streaming dengan menggunakan foreachBatch

Anda dapat menggunakan kombinasi merge dan foreachBatch untuk menulis upsert kompleks dari kueri streaming ke dalam tabel Delta. Lihat Menggunakan foreachBatch untuk menulis ke sink data arbitrer.

Pola ini memiliki banyak aplikasi, termasuk yang berikut ini:

  • Menulis agregat streaming dalam Mode Pembaruan: Kombinasi ini jauh lebih efisien daripada Mode Lengkap.
  • Menulis aliran perubahan database ke dalam tabel Delta: Kueri gabungan untuk menulis ubah data dapat digunakan di foreachBatch agar terus menerapkan aliran perubahan ke tabel Delta.
  • Menulis aliran data ke dalam tabel Delta dengan deduplikasi: Kueri penggabungan khusus sisipan untuk deduplikasi dapat digunakan foreachBatch untuk terus menulis data (dengan duplikat) ke tabel Delta dengan deduplikasi otomatis.

Catatan

  • Pastikan bahwa pernyataan merge Anda di dalam foreachBatch bersifat idempotent karena penghidupan ulang kueri streaming dapat menerapkan operasi pada batch data yang sama selama beberapa kali.
  • Ketika merge digunakan dalam foreachBatch, kecepatan data input dari kueri streaming (dilaporkan melalui StreamingQueryProgress dan terlihat dalam grafik kecepatan buku catatan) dapat dilaporkan berkali-kali lipat dari kecepatan yang sebenarnya saat data dihasilkan pada sumbernya. Fenomena ini terjadi karena merge membaca data input beberapa kali, sehingga menyebabkan metrik input dikalikan. Jika hal ini merupakan penyempitan, Anda dapat melakukan cache DataFrame batch sebelum merge dan kemudian membatalkan cache setelah merge.

Contoh berikut menunjukkan bagaimana Anda dapat menggunakan SQL dalam foreachBatch untuk menyelesaikan tugas ini:

Scala

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Anda juga dapat memilih untuk menggunakan API Delta Lake untuk melakukan upsert streaming, seperti dalam contoh berikut:

Scala

import io.delta.tables.*

val deltaTable = DeltaTable.forName(spark, "table_name")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "table_name")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

Tabel idempotensi menulis di foreachBatch

Catatan

Databricks merekomendasikan untuk mengonfigurasi penulisan streaming terpisah untuk setiap sink yang ingin Anda perbarui. Menggunakan foreachBatch untuk menulis ke beberapa tabel yang menserialisasikan penulisan, yang mengurangi paralelizaiton dan meningkatkan latensi keseluruhan.

Tabel Delta mendukung opsi berikut DataFrameWriter untuk membuat penulisan ke beberapa tabel dalam foreachBatch idempotensi:

  • txnAppId: String unik yang dapat Anda teruskan pada setiap penulisan DataFrame. Misalnya, Anda dapat menggunakan ID StreamingQuery sebagai txnAppId.
  • txnVersion: Jumlah yang meningkat secara monoton yang bertindak sebagai versi transaksi.

Delta Lake menggunakan kombinasi txnAppId dan txnVersion untuk mengidentifikasi duplikat penulisan dan mengabaikannya.

Jika penulisan batch terganggu dengan kegagalan, menjalankan kembali batch menggunakan aplikasi dan ID batch yang sama untuk membantu runtime mengidentifikasi penulisan duplikat dengan benar dan mengabaikannya. ID Aplikasi (txnAppId) dapat berupa string unik buatan pengguna dan tidak harus terkait dengan ID aliran. Lihat Menggunakan foreachBatch untuk menulis ke sink data arbitrer.

Peringatan

Jika Anda menghapus titik pemeriksaan streaming dan memulai ulang kueri dengan titik pemeriksaan baru, Anda harus menyediakan yang berbeda txnAppId. Titik pemeriksaan baru dimulai dengan ID batch .0 Delta Lake menggunakan ID batch dan txnAppId sebagai kunci unik, dan melompati batch dengan nilai yang sudah terlihat.

Contoh kode berikut menunjukkan pola ini:

Python

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()

Scala

val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}