Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Importante
La API de canalización create_sink está en versión preliminar pública.
La create_sink() función escribe en un servicio de streaming de eventos, como Apache Kafka o Azure Event Hubs, o en una tabla Delta desde una canalización declarativa. Después de crear un receptor con la función create_sink(), use el receptor en un flujo de anexión para escribir datos en el receptor. El flujo de anexión es el único tipo de flujo admitido con la función create_sink(). No se admiten otros tipos de flujo, como create_auto_cdc_flow, .
El receptor Delta admite tablas externas y administradas de Unity Catalog, así como tablas administradas por el metastore de Hive. Los nombres de tabla deben ser completos. Por ejemplo, las tablas de Catálogo de Unity deben usar un identificador de tres niveles: <catalog>.<schema>.<table>. Las tablas de metastore de Hive deben usar <schema>.<table>.
Nota:
- La ejecución de una actualización completa no borra los datos de los receptores. Los datos reprocesados se anexarán al receptor y no se modificarán los datos existentes.
- Las expectativas no se admiten a través de la
sinkAPI.
Syntax
from pyspark import pipelines as dp
dp.create_sink(name=<sink_name>, format=<format>, options=<options>)
Parámetros
| Parámetro | Tipo | Description |
|---|---|---|
name |
str |
Obligatorio. Cadena que identifica el receptor y se usa para hacer referencia al receptor y administrarlo. Los nombres de sumidero deben ser únicos en la canalización, incluidos en todos los archivos de código fuente que forman parte de la canalización. |
format |
str |
Obligatorio. Cadena que define el formato de salida, ya sea kafka o delta. |
options |
dict |
Lista de opciones de receptor, en formato {"key": "value"}, donde tanto la clave como el valor son cadenas. Se admiten todas las opciones de Databricks Runtime que son compatibles con los receptores Kafka y Delta.
|
Examples
from pyspark import pipelines as dp
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Create an external Delta table sink with a file path
dp.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dp.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)