Bagikan melalui


Memuat dan memproses data secara bertahap dengan alur Tabel Langsung Delta

Artikel ini menjelaskan alur apa dan bagaimana Anda dapat menggunakan alur di alur Tabel Langsung Delta untuk memproses data secara bertahap dari sumber ke tabel streaming target. Di Tabel Langsung Delta, alur didefinisikan dengan dua cara:

  1. Alur ditentukan secara otomatis saat Anda membuat kueri yang memperbarui tabel streaming.
  2. Tabel Langsung Delta juga menyediakan fungsionalitas untuk secara eksplisit menentukan alur untuk pemrosesan yang lebih kompleks seperti menambahkan ke tabel streaming dari beberapa sumber streaming.

Artikel ini membahas alur implisit yang dibuat saat Anda menentukan kueri untuk memperbarui tabel streaming, lalu menyediakan detail tentang sintaks untuk menentukan alur yang lebih kompleks.

Apa itu alur?

Di Tabel Langsung Delta, alur adalah kueri streaming yang memproses data sumber secara bertahap untuk memperbarui tabel streaming target. Sebagian besar himpunan data Tabel Langsung Delta yang Anda buat dalam alur menentukan alur sebagai bagian dari kueri dan tidak memerlukan pendefinisian alur secara eksplisit. Misalnya, Anda membuat tabel streaming di Tabel Langsung Delta dalam satu perintah DDL alih-alih menggunakan tabel terpisah dan pernyataan alur untuk membuat tabel streaming:

Catatan

Contoh ini CREATE FLOW disediakan hanya untuk tujuan ilustrasi dan menyertakan kata kunci yang tidak valid sintaks Tabel Langsung Delta.

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

Selain alur default yang ditentukan oleh kueri, antarmuka Delta Live Tables Python dan SQL menyediakan fungsionalitas alur penambahan. Alur tambahan mendukung pemrosesan yang memerlukan pembacaan data dari beberapa sumber streaming untuk memperbarui satu tabel streaming. Misalnya, Anda dapat menggunakan fungsionalitas alur penambahan saat Anda memiliki tabel dan alur streaming yang ada dan ingin menambahkan sumber streaming baru yang menulis ke tabel streaming yang ada ini.

Menggunakan tambahkan alur untuk menulis ke tabel streaming dari beberapa aliran sumber

Catatan

Untuk menggunakan pemrosesan alur tambahan, alur Anda harus dikonfigurasi untuk menggunakan saluran pratinjau.

@append_flow Gunakan dekorator di antarmuka Python atau CREATE FLOW klausa di antarmuka SQL untuk menulis ke tabel streaming dari beberapa sumber streaming. Gunakan alur tambahan untuk memproses tugas seperti berikut ini:

  • Tambahkan sumber streaming yang menambahkan data ke tabel streaming yang ada tanpa memerlukan refresh penuh. Misalnya, Anda mungkin memiliki tabel yang menggabungkan data regional dari setiap wilayah tempat Anda beroperasi. Saat wilayah baru diluncurkan, Anda dapat menambahkan data wilayah baru ke tabel tanpa melakukan refresh penuh. Lihat Contoh: Menulis ke tabel streaming dari beberapa topik Kafka.
  • Perbarui tabel streaming dengan menambahkan data historis yang hilang (backfilling). Misalnya, Anda memiliki tabel streaming yang sudah ada yang ditulis oleh topik Apache Kafka. Anda juga memiliki data historis yang disimpan dalam tabel yang perlu Anda sisipkan tepat sekali ke dalam tabel streaming, dan Anda tidak dapat mengalirkan data karena pemrosesan Anda mencakup melakukan agregasi kompleks sebelum menyisipkan data. Lihat Contoh: Menjalankan isi ulang data satu kali.
  • Gabungkan data dari beberapa sumber dan tulis ke satu tabel streaming alih-alih menggunakan UNION klausa dalam kueri. Menggunakan pemrosesan alur tambahan alih-alih UNION memungkinkan Anda memperbarui tabel target secara bertahap tanpa menjalankan pembaruan refresh penuh. Lihat Contoh: Menggunakan pemrosesan alur tambahan alih-alih UNION.

Target untuk output rekaman oleh pemrosesan alur tambahan dapat berupa tabel yang sudah ada atau tabel baru. Untuk kueri Python, gunakan fungsi create_streaming_table() untuk membuat tabel target.

Penting

  • Jika Anda perlu menentukan batasan kualitas data dengan harapan, tentukan harapan pada tabel target sebagai bagian create_streaming_table() dari fungsi atau pada definisi tabel yang ada. Anda tidak dapat menentukan ekspektasi dalam @append_flow definisi.
  • Alur diidentifikasi dengan nama alur, dan nama ini digunakan untuk mengidentifikasi titik pemeriksaan streaming. Penggunaan nama alur untuk mengidentifikasi titik pemeriksaan berarti sebagai berikut:
    • Jika alur yang ada dalam alur diganti namanya, titik pemeriksaan tidak dilakukan, dan alur yang diganti namanya secara efektif merupakan alur yang sama sekali baru.
    • Anda tidak dapat menggunakan kembali nama alur dalam alur, karena titik pemeriksaan yang ada tidak akan cocok dengan definisi alur baru.

Berikut ini adalah sintaks untuk @append_flow:

Python

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

SQL

CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  target_table BY NAME
SELECT * FROM
  source;

Contoh: Menulis ke tabel streaming dari beberapa topik Kafka

Contoh berikut membuat tabel streaming bernama kafka_target dan menulis ke tabel streaming tersebut dari dua topik Kafka:

Python

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Untuk mempelajari selengkapnya tentang fungsi bernilai tabel yang read_kafka() digunakan dalam kueri SQL, lihat read_kafka dalam referensi bahasa SQL.

Contoh: Menjalankan isi ulang data satu kali

Contoh berikut menjalankan kueri untuk menambahkan data historis ke tabel streaming:

Catatan

Untuk memastikan isi ulang satu kali yang benar saat kueri isi ulang adalah bagian dari alur yang berjalan secara terjadwal atau terus menerus, hapus kueri setelah menjalankan alur sekali. Untuk menambahkan data baru jika tiba di direktori isi ulang, biarkan kueri di tempat.

Python

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  cloud_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  cloud_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Contoh: Gunakan pemrosesan alur tambahan alih-alih UNION

Alih-alih menggunakan kueri dengan UNION klausul, Anda bisa menggunakan kueri alur tambahan untuk menggabungkan beberapa sumber dan menulis ke satu tabel streaming. Menggunakan kueri alur tambahan alih-alih UNION memungkinkan Anda menambahkan ke tabel streaming dari beberapa sumber tanpa menjalankan refresh penuh.

Contoh Python berikut menyertakan kueri yang menggabungkan beberapa sumber data dengan UNION klausa:

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

Contoh berikut mengganti UNION kueri dengan kueri alur tambahan:

Python

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/us",
    "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/eu",
    "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/apac",
    "csv"
  );