다음을 통해 공유


append_flow

@dp.append_flow 데코레이터는 파이프라인 테이블에 대한 추가 흐름 또는 백필을 생성합니다. 함수는 Apache Spark 스트리밍 데이터 프레임을 반환해야 합니다. Lakeflow Spark 선언적 파이프라인 흐름을 사용하여 데이터 증분 로드 및 처리를 참조하세요.

추가 흐름은 스트리밍 테이블 또는 싱크를 대상으로 할 수 있습니다.

문법

from pyspark import pipelines as dp

dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dp.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  once = <bool>, # optional, defaults to false
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming-query>) #

매개 변수

매개 변수 유형 Description
기능 function 필수 사항입니다. 사용자 정의 쿼리에서 Apache Spark 스트리밍 DataFrame을 반환하는 함수입니다.
target str 필수 사항입니다. 추가 흐름의 대상인 테이블 또는 싱크의 이름입니다.
name str 흐름 이름입니다. 제공되지 않으면 기본적으로 함수 이름이 지정됩니다.
once bool 필요에 따라 흐름을 백필과 같은 일회성 흐름으로 정의합니다. once=True 다음 두 가지 방법으로 흐름을 변경합니다.
  • 반환 값입니다. streaming-query; 이 경우는 스트리밍 데이터 프레임이 아닌 배치 DataFrame이어야 합니다.
  • 흐름은 기본적으로 한 번 실행됩니다. 파이프라인이 전체 새로 고침으로 업데이트되면 흐름이 ONCE 다시 실행되어 데이터를 다시 만듭니다.
comment str 흐름에 대한 설명입니다.
spark_conf dict 이 쿼리 실행을 위한 Spark 구성 목록

예시

from pyspark import pipelines as dp

# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
    return (
      spark.read
      .format("json")
      .load("/path/to/backfill/")
    )

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))