Megosztás a következőn keresztül:


create_sink

Fontos

A csővezeték create_sink API nyilvános előzetes megtekintésként érhető el.

A create_sink() függvény egy eseménystreamelési szolgáltatásba, például az Apache Kafkába vagy az Azure Event Hubsba vagy egy deklaratív folyamatból egy Delta-táblába ír. Miután létrehozott egy gyűjtőhelyet a create_sink() függvénnyel, a gyűjtőhelyet egy hozzáfűzési folyamatban használja az adatok hozzáfűzésére. a hozzáfűzési folyam az egyetlen folyamattípus, amelyet az create_sink() függvény támogat. Más folyamattípusok, például create_auto_cdc_flow, nem támogatottak.

A Delta sink támogatja a Unity Catalog külső és felügyelt tábláit, valamint a Hive metaadattár által felügyelt táblákat. A táblaneveknek teljesen specifikáltnak kell lenniük. A Unity Catalog tábláinak például háromszintű azonosítót kell használniuk: <catalog>.<schema>.<table>. A Hive metaadattártábláinak <schema>.<table>kell használniuk.

Megjegyzés:

  • A teljes frissítés futtatása nem törli az adatokat a tárolóhelyekről. Az újrafeldolgozott adatokat hozzáfűzzük a tárolóhoz, és a meglévő adatok nem módosulnak.
  • Az API nem támogatja az sink elvárásokat.

Szemantika

from pyspark import pipelines as dp

dp.create_sink(name=<sink_name>, format=<format>, options=<options>)

Paraméterek

Paraméter Típus Description
name str Szükséges. Egy karaktersor, amely azonosítja a csatornát, és a csatorna hivatkozására és kezelésére szolgál. A fogadó neveknek egyedieknek kell lenniük az adatfolyamban, beleértve az adatfolyam részét képező összes forráskódfájlt is.
format str Szükséges. A kimeneti formátumot definiáló sztring, kafka vagy delta.
options dict A fogadó beállításainak listája, formázva {"key": "value"}, ahol a kulcs és az érték egyaránt sztring. A Kafka és a Delta csatlakozási pontok által támogatott összes Databricks-futtatókörnyezeti beállítás támogatott.

Példák

from pyspark import pipelines as dp

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Create an external Delta table sink with a file path
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "path": "/path/to/my/delta/table" }
)

# Create a Delta table sink using a table name
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)