次の方法で共有


フローを追加

@dlt.append_flowデコレーターは、Lakeflow 宣言パイプライン テーブルの追加フローまたはバックフィルを作成します。 この関数は、Apache Spark ストリーミング DataFrame を返す必要があります。 「Lakeflow 宣言パイプライン フローを使用してデータを増分的に読み込んで処理する」を参照してください。

追加フローは、ストリーミング テーブルまたはシンクをターゲットにすることができます。

構文

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

パラメーター

パラメーター タイプ 説明
機能 function 必須。 ユーザー定義クエリから Apache Spark ストリーミング DataFrame を返す関数。
target str 必須。 追加フローのターゲットであるテーブルまたはシンクの名前。
name str フロー名。 指定しない場合は、既定で関数名が使用されます。
comment str フローの説明。
spark_conf dict このクエリを実行するための Spark 構成の一覧

例示

import dlt

# Create a sink for an external Delta table
dlt.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dlt.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

# Create a Kafka sink
dlt.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Add an append flow to a Kafka sink
@dlt.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))