Bagikan melalui


Contoh alur dalam Lakeflow Spark Declarative Pipelines

Contoh: Menulis ke tabel streaming dari beberapa topik Kafka

Contoh berikut membuat tabel streaming bernama kafka_target dan menulis ke tabel streaming tersebut dari dua topik Kafka:

Phyton

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dp.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Untuk mempelajari selengkapnya tentang fungsi bernilai tabel yang read_kafka() digunakan dalam kueri SQL, lihat read_kafka dalam referensi bahasa SQL.

Di Python, Anda dapat secara terprogram membuat beberapa alur yang menargetkan satu tabel. Contoh berikut menunjukkan pola ini untuk daftar topik Kafka.

Nota

Pola ini memiliki persyaratan yang sama seperti menggunakan perulangan for untuk membuat tabel. Anda harus secara eksplisit meneruskan nilai Python ke fungsi yang menentukan alur. Lihat Buat tabel dalam loopfor.

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

  @dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
  def topic_flow(topic=topic_name):
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", topic)
        .load()
    )

Contoh: Menjalankan isi ulang data satu kali

Jika Anda ingin menjalankan kueri untuk menambahkan data ke tabel streaming yang sudah ada, gunakan append_flow.

Setelah menambahkan sekumpulan data yang sudah ada, Anda memiliki beberapa opsi:

  • Jika Anda ingin kueri menambahkan data baru jika tiba di direktori isi ulang, biarkan kueri di tempatnya.
  • Jika Anda ingin ini menjadi isi ulang satu kali, dan jangan pernah berjalan lagi, hapus kueri setelah menjalankan alur sekali.
  • Jika Anda ingin kueri berjalan satu kali, dan hanya berjalan lagi dalam kasus di mana data mengalami penyegaran penuh, atur once parameter ke True pada alur penyambungan. Di SQL, gunakan INSERT INTO ONCE.

Contoh berikut menjalankan kueri untuk menambahkan data historis ke tabel streaming:

Phyton

from pyspark import pipelines as dp

@dp.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dp.append_flow(
  target = "csv_target",
  once = True)
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO ONCE
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Untuk contoh yang lebih mendalam, lihat Mengisi ulang data historis dengan alur.

Contoh: Gunakan pemrosesan alur tambahan alih-alih UNION

Daripada menggunakan kueri dengan UNION klausul, Anda dapat menggunakan kueri alur penambahan untuk menggabungkan beberapa sumber dan memasukkannya ke dalam satu tabel streaming. Menggunakan kueri alur tambahan alih-alih UNION memungkinkan Anda menambahkan ke tabel streaming dari beberapa sumber tanpa menjalankan refresh penuh.

Contoh Python berikut menyertakan kueri yang menggabungkan beberapa sumber data dengan UNION klausa:

@dp.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")
  )

  raw_orders_eu = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")
  )

  return raw_orders_us.union(raw_orders_eu)

Contoh berikut mengganti kueri pencarian UNION dengan kueri pengaliran tambahan.

Phyton

dp.create_streaming_table("raw_orders")

@dp.append_flow(target="raw_orders")
def raw_orders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dp.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/us",
    format => "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/eu",
    format => "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/apac",
    format => "csv"
  );