Freigeben über


Erstelle_Senke

Von Bedeutung

Die Pipeline-API create_sink befindet sich in der öffentlichen Vorschau.

Die create_sink() Funktion schreibt in einen Ereignisstreamingdienst wie Apache Kafka oder Azure Event Hubs oder in eine Delta-Tabelle aus einer deklarativen Pipeline. Nachdem Sie eine Spüle mit der create_sink()-Funktion erstellt haben, verwenden Sie die Spüle in einem Anfügefluss, um Daten in die Spüle zu schreiben. Der Anfügeflow ist der einzige Flowtyp, der mit der create_sink()-Funktion unterstützt wird. Andere Flusstypen, z. B. create_auto_cdc_flow, werden nicht unterstützt.

Die Delta-Senke unterstützt externe und verwaltete Unity Catalog-Tabellen sowie verwaltete Tabellen im Hive-Metastore. Tabellennamen müssen vollqualifiziert sein. Beispielsweise müssen Unity-Katalogtabellen einen dreistufigen Bezeichner verwenden: <catalog>.<schema>.<table>. Hive-Metaspeichertabellen müssen <schema>.<table>verwenden.

Hinweis

  • Beim Ausführen eines vollständigen Aktualisierungsupdates werden keine Daten aus Senken gelöscht. Alle nachverarbeiteten Daten werden an die Datensenke angefügt, und vorhandene Daten werden nicht geändert.
  • Erwartungen werden mit der sink API nicht unterstützt.

Syntax

from pyspark import pipelines as dp

dp.create_sink(name=<sink_name>, format=<format>, options=<options>)

Die Parameter

Parameter Typ Description
name str Erforderlich. Eine Zeichenfolge, die die Senke identifiziert und verwendet wird, um auf die Senke zu verweisen und diese zu verwalten. Sink-Namen müssen innerhalb der Pipeline eindeutig sein, auch über alle Quellcodedateien hinweg, die Teil der Pipeline sind.
format str Erforderlich. Eine Zeichenfolge, die das Ausgabeformat definiert, entweder kafka oder delta.
options dict Eine Liste mit Senkenoptionen im Format {"key": "value"}, wobei Schlüssel und Wert Zeichenfolgen sind. Alle von den Kafka- und Delta-Senken unterstützten Databricks Runtime-Optionen werden unterstützt.

Examples

from pyspark import pipelines as dp

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Create an external Delta table sink with a file path
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "path": "/path/to/my/delta/table" }
)

# Create a Delta table sink using a table name
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)