共用方式為


append_flow

@dp.append_flow裝飾器會為您的管線資料表建立附加流程或回填。 函式必須傳回 Apache Spark 串流 DataFrame。 請參閱使用 Lakeflow Spark 宣告式管線流程以增量方式載入和處理資料。

追加流程可以以串流資料表或接收端為目標。

語法

from pyspark import pipelines as dp

dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dp.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  once = <bool>, # optional, defaults to false
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming-query>) #

參數

參數 類型 Description
函式 function 必須的。 用來從使用者定義的查詢中返回 Apache Spark 串流 DataFrame 的函數。
target str 必須的。 作為附加流程目標的資料表或接收器的名稱。
name str 流程名稱。 如果未提供,則預設為函式名稱。
once bool 或者,將流程定義為一次性流程,例如回填。 使用 once=True 將以兩種方式改變流程:
  • 傳回值。 streaming-query。 在這種情況下,必須是批次 DataFrame,而非串流 DataFrame。
  • 預設情況下,流程會執行一次。 如果管道已更新為完整刷新,則ONCE流程會再度執行以重新創建資料。
comment str 對流程的描述。
spark_conf dict 用於執行此查詢的 Spark 組態清單

範例

from pyspark import pipelines as dp

# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
    return (
      spark.read
      .format("json")
      .load("/path/to/backfill/")
    )

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))