Ескертпе
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Жүйеге кіруді немесе каталогтарды өзгертуді байқап көруге болады.
Бұл бетке кіру үшін қатынас шегін айқындау қажет. Каталогтарды өзгертуді байқап көруге болады.
Декоратор @dp.append_flow создает потоки добавления данных или обратное наполнение для таблиц конвейера. Функция должна возвращать кадр данных потоковой передачи Apache Spark. Дополнительные сведения о загрузке и обработке данных см. в потоках декларативных конвейеров Lakeflow Spark.
Потоки добавления могут быть направлены на таблицы потоков или приемники.
Синтаксис
from pyspark import pipelines as dp
dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dp.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
once = <bool>, # optional, defaults to false
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming-query>) #
Параметры
| Параметр | Тип | Description |
|---|---|---|
| function | function |
Обязательное. Функция, возвращающая кадр данных потоковой передачи Apache Spark из определяемого пользователем запроса. |
target |
str |
Обязательное. Имя таблицы или приемника, которое является целевым объектом потока добавления. |
name |
str |
Имя потока. Если этот параметр не указан, по умолчанию используется имя функции. |
once |
bool |
При необходимости определите поток как одноразовый, например, обратная закачка. Использование once=True меняет поток двумя способами:
|
comment |
str |
Описание потока. |
spark_conf |
dict |
Список конфигураций Spark для выполнения этого запроса |
Примеры
from pyspark import pipelines as dp
# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
return (
spark.read
.format("json")
.load("/path/to/backfill/")
)
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))