Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Important
L’API de pipeline create_sink est en préversion publique.
La create_sink() fonction écrit dans un service de streaming d’événements, comme Apache Kafka ou Azure Event Hubs, ou dans une table Delta à partir d’un pipeline déclaratif. Après avoir créé un récepteur avec la fonction create_sink(), vous utilisez le récepteur dans un flux d’ajout pour écrire des données dans le récepteur. Le flux d’ajout est le seul type de flux pris en charge avec la fonction create_sink(). D’autres types de flux, tels que create_auto_cdc_flow, ne sont pas pris en charge.
Le Delta prend en charge les tables externes et gérées du catalogue Unity et les tables gérées du metastore Hive. Les noms de table doivent être complets. Par exemple, les tables du catalogue Unity doivent utiliser un identificateur à trois niveaux : <catalog>.<schema>.<table>. Les tables de metastore Hive doivent utiliser <schema>.<table>.
Note
- Exécuter une mise à jour d’actualisation complète n’efface pas les données des récepteurs. Toutes les données traitées seront ajoutées au récepteur et les données existantes ne seront pas modifiées.
- Les attentes ne sont pas prises en charge avec l’API
sink.
Syntaxe
from pyspark import pipelines as dp
dp.create_sink(name=<sink_name>, format=<format>, options=<options>)
Paramètres
| Paramètre | Type | Descriptif |
|---|---|---|
name |
str |
Obligatoire. Chaîne qui identifie le récepteur et est utilisée pour référencer et gérer le récepteur. Les noms des récepteurs doivent être uniques dans le pipeline, y compris dans tous les fichiers de code source qui font partie du pipeline. |
format |
str |
Obligatoire. Chaîne qui définit le format de sortie, kafka ou delta. |
options |
dict |
Liste des options de sink, formatées comme {"key": "value"}, où la clé et la valeur sont toutes deux des chaînes de caractères. Toutes les options Databricks Runtime prises en charge par les récepteurs Kafka et Delta sont supportées.
|
Examples
from pyspark import pipelines as dp
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Create an external Delta table sink with a file path
dp.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dp.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)