Mengubah data dengan Tabel Langsung Delta

Artikel ini menjelaskan cara menggunakan Tabel Langsung Delta untuk mendeklarasikan transformasi pada himpunan data dan menentukan bagaimana rekaman diproses melalui logika kueri. Ini juga berisi beberapa contoh pola transformasi umum yang dapat berguna saat membangun alur Tabel Langsung Delta.

Anda dapat menentukan himpunan data terhadap kueri apa pun yang mengembalikan DataFrame. Anda dapat menggunakan operasi bawaan Apache Spark, UDF, logika kustom, dan model MLflow sebagai transformasi di alur Tabel Langsung Delta Anda. Setelah data diserap ke dalam alur Delta Live Tables, Anda dapat menentukan himpunan data baru terhadap sumber upstream untuk membuat tabel streaming baru, tampilan materialisasi, dan tampilan.

Untuk mempelajari cara melakukan pemrosesan stateful secara efektif dengan Tabel Langsung Delta, lihat Mengoptimalkan pemrosesan stateful di Tabel Langsung Delta dengan marka air.

Kapan menggunakan tampilan, tampilan materialisasi, dan tabel streaming

Untuk memastikan alur Anda efisien dan dapat dipertahankan, pilih jenis himpunan data terbaik saat Anda menerapkan kueri alur Anda.

Pertimbangkan untuk menggunakan tampilan saat:

  • Anda memiliki kueri besar atau kompleks yang ingin Anda pecahkan ke kueri yang lebih mudah dikelola.
  • Anda ingin memvalidasi hasil perantara menggunakan ekspektasi.
  • Anda ingin mengurangi biaya penyimpanan dan komputasi dan tidak memerlukan materialisasi hasil kueri. Karena tabel terwujud, tabel memerlukan sumber daya komputasi dan penyimpanan tambahan.

Pertimbangkan untuk menggunakan tampilan materialisasi saat:

  • Beberapa kueri hilir menggunakan tabel. Karena tampilan dihitung sesuai permintaan, tampilan dihitung ulang setiap kali tampilan dikueri.
  • Alur, pekerjaan, atau kueri lainnya menggunakan tabel. Karena tampilan tidak terwujud, Anda hanya dapat menggunakannya dalam alur yang sama.
  • Anda ingin menampilkan hasil kueri selama pengembangan. Karena tabel terwujud dan dapat dilihat dan dikueri di luar alur, menggunakan tabel selama pengembangan dapat membantu memvalidasi kebenaran komputasi. Setelah memvalidasi, konversi kueri yang tidak memerlukan materialisasi menjadi tampilan.

Pertimbangkan untuk menggunakan tabel streaming saat:

  • Kueri didefinisikan terhadap sumber data yang terus menerus atau bertambah secara bertahap.
  • Hasil kueri harus dihitung secara bertahap.
  • Throughput tinggi dan latensi rendah diinginkan untuk alur.

Catatan

Tabel streaming selalu didefinisikan terhadap sumber streaming. Anda juga dapat menggunakan sumber streaming dengan APPLY CHANGES INTO untuk menerapkan pembaruan dari umpan CDC. Lihat TERAPKAN PERUBAHAN API: Menyederhanakan perubahan pengambilan data di Tabel Langsung Delta.

Menggabungkan tabel streaming dan tampilan materialisasi dalam satu alur

Tabel streaming mewarisi jaminan pemrosesan Apache Spark Structured Streaming dan dikonfigurasi untuk memproses kueri dari sumber data khusus tambahan, di mana baris baru selalu dimasukkan ke dalam tabel sumber daripada dimodifikasi.

Catatan

Meskipun, secara default, tabel streaming memerlukan sumber data khusus tambahan, ketika sumber streaming adalah tabel streaming lain yang memerlukan pembaruan atau penghapusan, Anda dapat mengambil alih perilaku ini dengan bendera skipChangeCommits.

Pola streaming umum mencakup penyerapan data sumber untuk membuat himpunan data awal dalam alur. Himpunan data awal ini biasanya disebut tabel perunggu dan sering melakukan transformasi sederhana.

Sebaliknya, tabel akhir dalam alur, biasanya disebut sebagai tabel emas , sering memerlukan agregasi atau pembacaan rumit dari sumber yang merupakan target APPLY CHANGES INTO operasi. Karena operasi ini secara inheren membuat pembaruan daripada penampingan, operasi ini tidak didukung sebagai input ke tabel streaming. Transformasi ini lebih cocok untuk tampilan materialisasi.

Dengan mencampur tabel streaming dan tampilan materialisasi ke dalam satu alur, Anda dapat menyederhanakan alur Anda, menghindari penyerapan ulang atau pemrosesan ulang data mentah yang mahal, dan memiliki kekuatan penuh SQL untuk menghitung agregasi kompleks melalui himpunan data yang dikodekan dan difilter secara efisien. Contoh berikut mengilustrasikan jenis pemrosesan campuran ini:

Catatan

Contoh-contoh ini menggunakan Auto Loader untuk memuat file dari penyimpanan cloud. Untuk memuat file dengan Auto Loader dalam alur dengan dukungan Unity Catalog, Anda harus menggunakan lokasi eksternal. Untuk mempelajari selengkapnya tentang menggunakan Katalog Unity dengan Delta Live Tables, lihat Menggunakan Katalog Unity dengan alur Delta Live Tables Anda.

Python

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return dlt.read_stream("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return dlt.read("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM cloud_files(
  "abfss://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH LIVE TABLE live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

Pelajari lebih lanjut cara menggunakan Auto Loader untuk membaca file JSON secara efisien dari penyimpanan Azure untuk pemrosesan inkremental.

Gabungan statis aliran

Gabungan statis aliran adalah pilihan yang baik saat mendenormalisasi aliran berkelanjutan data khusus tambahan dengan tabel dimensi statis utamanya.

Dengan setiap pembaruan alur, rekaman baru dari aliran digabungkan dengan rekam jepret terbaru dari tabel statis. Jika rekaman ditambahkan atau diperbarui dalam tabel statis setelah data terkait dari tabel streaming diproses, rekaman yang dihasilkan tidak dihitung ulang kecuali refresh penuh dilakukan.

Dalam alur yang dikonfigurasi untuk eksekusi yang dipicu, tabel statis mengembalikan hasil pada saat pembaruan dimulai. Dalam alur yang dikonfigurasi untuk eksekusi berkelanjutan, setiap kali tabel memproses pembaruan, versi terbaru tabel statis dikueri.

Berikut ini adalah contoh gabungan statis aliran:

Python

@dlt.table
def customer_sales():
  return dlt.read_stream("sales").join(dlt.read("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

Menghitung agregat secara efisien

Anda dapat menggunakan tabel streaming untuk menghitung agregat distributif sederhana secara bertahap seperti jumlah, min, maks, atau jumlah, dan agregat aljabar seperti simpangan rata-rata atau standar. Databricks merekomendasikan agregasi inkremental untuk kueri dengan jumlah grup terbatas, misalnya, kueri dengan klausul GROUP BY country. Hanya data input baru yang dibaca dengan setiap pembaruan.

Untuk mempelajari selengkapnya tentang menulis kueri Delta Live Tables yang melakukan agregasi inkremental, lihat Melakukan agregasi berjendela dengan marka air.

Menggunakan model MLflow dalam alur Tabel Langsung Delta

Catatan

Untuk menggunakan model MLflow dalam alur yang mendukung Katalog Unity, alur Anda harus dikonfigurasi untuk menggunakan preview saluran. Untuk menggunakan current saluran, Anda harus mengonfigurasi alur untuk diterbitkan ke metastore Apache Hive.

Anda dapat menggunakan model terlatih MLflow di alur Delta Live Tables. Model MLflow diperlakukan sebagai transformasi di Azure Databricks, yang berarti model tersebut bertindak berdasarkan input Spark DataFrame dan mengembalikan hasil sebagai Spark DataFrame. Karena Tabel Langsung Delta menentukan himpunan data terhadap DataFrames, Anda dapat mengonversi beban kerja Apache Spark yang memanfaatkan MLflow ke Tabel Langsung Delta hanya dengan beberapa baris kode. Untuk informasi selengkapnya tentang MLflow, lihat Manajemen siklus hidup ML menggunakan MLflow.

Jika Anda sudah memiliki buku catatan Python yang memanggil model MLflow, Anda dapat menyesuaikan kode ini dengan Tabel Langsung Delta dengan menggunakan @dlt.table dekorator dan memastikan fungsi didefinisikan untuk mengembalikan hasil transformasi. Tabel Langsung Delta tidak menginstal MLflow secara default, jadi pastikan Anda %pip install mlflow dan impor mlflow dan dlt di bagian atas buku catatan Anda. Untuk pengenalan sintaks Tabel Langsung Delta, lihat Contoh: Menyerap dan memproses data nama bayi New York.

Untuk menggunakan model MLflow di Tabel Langsung Delta, selesaikan langkah-langkah berikut:

  1. Dapatkan ID eksekusi dan nama model model MLflow. ID eksekusi dan nama model digunakan untuk membangun URI model MLflow.
  2. Gunakan URI untuk menentukan Spark UDF untuk memuat model MLflow.
  3. Panggil UDF dalam definisi tabel Anda untuk menggunakan model MLflow.

Contoh berikut menunjukkan sintaks dasar untuk pola ini:

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return dlt.read(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

Sebagai contoh lengkap, kode berikut mendefinisikan Spark UDF bernama loaded_model_udf yang memuat model MLflow yang dilatih pada data risiko pinjaman. Kolom data yang digunakan untuk membuat prediksi diteruskan sebagai argumen ke UDF. Tabel loan_risk_predictions menghitung prediksi untuk setiap baris di loan_risk_input_data.

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return dlt.read("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

Pertahankan penghapusan atau pembaruan manual

Tabel Langsung Delta memungkinkan Anda menghapus atau memperbarui rekaman secara manual dari tabel dan melakukan operasi refresh untuk mengolah ulang tabel hilir.

Secara default, Tabel Langsung Delta mengolah ulang hasil tabel berdasarkan data input setiap kali alur diperbarui, jadi Anda harus memastikan rekaman yang dihapus tidak dimuat ulang dari data sumber. pipelines.reset.allowed Mengatur properti tabel untuk false mencegah refresh ke tabel tetapi tidak mencegah penulisan bertahap ke tabel atau mencegah data baru mengalir ke tabel.

Diagram berikut mengilustrasikan contoh menggunakan dua tabel streaming:

  • raw_user_table menyerap data pengguna mentah dari sumber.
  • bmi_table menghitung skor BMI secara bertahap menggunakan berat dan tinggi badan dari raw_user_table.

Anda ingin menghapus atau memperbarui rekaman pengguna secara manual dari raw_user_table dan mengolah bmi_tableulang .

Mempertahankan diagram data

Kode berikut menunjukkan pengaturan pipelines.reset.allowed properti tabel ke untuk false menonaktifkan refresh raw_user_table penuh agar perubahan yang dimaksudkan dipertahankan dari waktu ke waktu, tetapi tabel hilir dikomputasi ulang saat pembaruan alur dijalankan:

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM cloud_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);