Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Důležité
Rozhraní API pipeline create_sink je ve verzi Public Preview.
Funkce create_sink() zapisuje do služby streamování událostí, jako je Apache Kafka nebo Azure Event Hubs, nebo do tabulky Delta z deklarativního kanálu. Po vytvoření zásobníku pomocí funkce create_sink() použijete zásobník v připojovacím toku k zápisu dat do něj. Připojovací tok je jediný typ toku podporovaný funkcí create_sink(). Jiné typy toků, například create_auto_cdc_flow, se nepodporují.
Delta sink podporuje externí a spravované tabulky Unity Catalog a spravované tabulky Hive metastore. Názvy tabulek musí být plně kvalifikované. Například tabulky katalogu Unity musí používat identifikátor třívrstvé: <catalog>.<schema>.<table>. Tabulky úložiště metadat Hive musí používat <schema>.<table>.
Poznámka:
- Spuštění úplné aktualizace nevymaže data z jímek. Do jímky budou připojena všechna znovu zpracovaná data a stávající data nebudou změněna.
- Očekávání nejsou podporována rozhraním
sinkAPI.
Syntaxe
from pyspark import pipelines as dp
dp.create_sink(name=<sink_name>, format=<format>, options=<options>)
Parametry
| Parameter | Typ | Description |
|---|---|---|
name |
str |
Povinné. Řetězec, který identifikuje jímku a slouží k odkazování a správě jímky. Názvy jímek musí být pro kanál jedinečné, včetně všech souborů zdrojového kódu, které jsou součástí kanálu. |
format |
str |
Povinné. Řetězec, který definuje výstupní formát, kafka nebo delta. |
options |
dict |
Seznam možností jímky formátovaný jako {"key": "value"}, kde klíč a hodnota jsou oba řetězce. Jsou podporovány všechny možnosti Databricks Runtime, které jsou podporovány jímkami Kafka a Delta.
|
Examples
from pyspark import pipelines as dp
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Create an external Delta table sink with a file path
dp.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dp.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)