Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Important
update_flow API находится в общедоступной предварительной версии.
@dp.update_flow Используйте декоратор для создания потока обновления. Потоки обновления записываются в приемники с помощью режима вывода обновления, создавая только строки, которые изменились в каждом пакете. В отличие от потоков добавления, они поддерживают агрегирование с отслеживанием состояния без необходимости подложки.
Потоки обновления могут использовать только целевые приемники. Разностные таблицы не поддерживаются.
Syntax
from pyspark import pipelines as dp
dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})
@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)
Parameters
| Parameter | Тип | Description |
|---|---|---|
| function | function |
Обязательно. Функция, возвращающая кадр данных потоковой передачи Apache Spark из определяемого пользователем запроса. |
target |
str |
Обязательно. Имя приемника, в который записывается поток. |
name |
str |
Имя потока. Если этот параметр не указан, по умолчанию используется имя функции. |
comment |
str |
Описание потока. |
spark_conf |
dict |
Дикт конфигураций Spark для выполнения этого запроса. Эти конфигурации переопределяют конфессии для назначения, конвейера или кластера. |
import_checkpoint |
str |
Внешний путь контрольной точки для импорта перед запуском потока. Импортированный только один раз, когда каталог контрольных точек потока еще не существует. |
Примеры
Агрегирование в приемник Kafka
Запись результатов агрегирования с отслеживанием состояния в приемник Kafka:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)
Режим реального времени
Используйте spark_conf для настройки потока обновления для режима реального времени:
from pyspark import pipelines as dp
dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)
Ограничения
- Приемники разностных таблиц не поддерживаются в качестве целевых объектов для потоков обновления.