Заметка
Доступ к этой странице требует авторизации. Вы можете попробовать войти в систему или изменить каталог.
Доступ к этой странице требует авторизации. Вы можете попробовать сменить директорию.
Это важно
API конвейера create_sink находится в общедоступной предварительной версии.
Функция create_sink() записывается в службу потоковой передачи событий, например Apache Kafka или Центры событий Azure, или в таблицу Delta из декларативного конвейера. После создания приемника с функцией create_sink(), вы используете приемник в потоке добавления для записи данных в приемник. Единственный тип потока, поддерживаемый функцией create_sink(), — это поток добавления. Другие типы потоков, например create_auto_cdc_flow, не поддерживаются.
Delta sink поддерживает внешние и управляемые таблицы в каталоге Unity и управляемые таблицы в мета-хранилище Hive. Имена таблиц должны быть полностью квалифицированы. Например, таблицы каталога Unity должны использовать трехуровневый идентификатор: <catalog>.<schema>.<table>. Таблицы хранилища метаданных Hive должны использовать <schema>.<table>.
Замечание
- Запуск полного обновления не очищает данные из хранилищ. Все повторно обработанные данные будут добавлены в приемник, и существующие данные не будут изменены.
- Ожидания с API
sinkне поддерживаются.
Синтаксис
from pyspark import pipelines as dp
dp.create_sink(name=<sink_name>, format=<format>, options=<options>)
Параметры
| Параметр | Тип | Description |
|---|---|---|
name |
str |
Обязательное. Строка, идентифицирующая приемник и используемая для ссылки на него и управления им. Имена приемников должны быть уникальными для конвейера, включая все файлы исходного кода, которые являются частью конвейера. |
format |
str |
Обязательное. Строка, определяющая выходной формат либо kafka, либо delta. |
options |
dict |
Список параметров приемника, отформатированный как {"key": "value"}, где ключ и значение являются обеими строками. Поддерживаются все параметры среды выполнения Databricks, поддерживаемые приемниками Kafka и Delta.
|
Примеры
from pyspark import pipelines as dp
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Create an external Delta table sink with a file path
dp.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dp.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)