@dp.append_flow 데코레이터는 파이프라인 테이블에 대한 추가 흐름 또는 백필을 생성합니다. 함수는 Apache Spark 스트리밍 데이터 프레임을 반환해야 합니다.
Lakeflow Spark 선언적 파이프라인 흐름을 사용하여 데이터 증분 로드 및 처리를 참조하세요.
추가 흐름은 스트리밍 테이블 또는 싱크를 대상으로 할 수 있습니다.
문법
from pyspark import pipelines as dp
dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dp.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
once = <bool>, # optional, defaults to false
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming-query>) #
매개 변수
| 매개 변수 | 유형 | Description |
|---|---|---|
| 기능 | function |
필수 사항입니다. 사용자 정의 쿼리에서 Apache Spark 스트리밍 DataFrame을 반환하는 함수입니다. |
target |
str |
필수 사항입니다. 추가 흐름의 대상인 테이블 또는 싱크의 이름입니다. |
name |
str |
흐름 이름입니다. 제공되지 않으면 기본적으로 함수 이름이 지정됩니다. |
once |
bool |
필요에 따라 흐름을 백필과 같은 일회성 흐름으로 정의합니다.
once=True 다음 두 가지 방법으로 흐름을 변경합니다.
|
comment |
str |
흐름에 대한 설명입니다. |
spark_conf |
dict |
이 쿼리 실행을 위한 Spark 구성 목록 |
예시
from pyspark import pipelines as dp
# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
return (
spark.read
.format("json")
.load("/path/to/backfill/")
)
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))