Sdílet prostřednictvím


vytvořit sink

Důležité

Rozhraní API pipeline create_sink je ve verzi Public Preview.

Funkce create_sink() zapisuje do služby streamování událostí, jako je Apache Kafka nebo Azure Event Hubs, nebo do tabulky Delta z deklarativního kanálu. Po vytvoření zásobníku pomocí funkce create_sink() použijete zásobník v připojovacím toku k zápisu dat do něj. Připojovací tok je jediný typ toku podporovaný funkcí create_sink(). Jiné typy toků, například create_auto_cdc_flow, se nepodporují.

Delta sink podporuje externí a spravované tabulky Unity Catalog a spravované tabulky Hive metastore. Názvy tabulek musí být plně kvalifikované. Například tabulky katalogu Unity musí používat identifikátor třívrstvé: <catalog>.<schema>.<table>. Tabulky úložiště metadat Hive musí používat <schema>.<table>.

Poznámka:

  • Spuštění úplné aktualizace nevymaže data z jímek. Do jímky budou připojena všechna znovu zpracovaná data a stávající data nebudou změněna.
  • Očekávání nejsou podporována rozhraním sink API.

Syntaxe

from pyspark import pipelines as dp

dp.create_sink(name=<sink_name>, format=<format>, options=<options>)

Parametry

Parameter Typ Description
name str Povinné. Řetězec, který identifikuje jímku a slouží k odkazování a správě jímky. Názvy jímek musí být pro kanál jedinečné, včetně všech souborů zdrojového kódu, které jsou součástí kanálu.
format str Povinné. Řetězec, který definuje výstupní formát, kafka nebo delta.
options dict Seznam možností jímky formátovaný jako {"key": "value"}, kde klíč a hodnota jsou oba řetězce. Jsou podporovány všechny možnosti Databricks Runtime, které jsou podporovány jímkami Kafka a Delta.

Examples

from pyspark import pipelines as dp

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Create an external Delta table sink with a file path
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "path": "/path/to/my/delta/table" }
)

# Create a Delta table sink using a table name
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)