Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Von Bedeutung
Die Pipeline-API create_sink befindet sich in der öffentlichen Vorschau.
Die create_sink() Funktion schreibt in einen Ereignisstreamingdienst wie Apache Kafka oder Azure Event Hubs oder in eine Delta-Tabelle aus einer deklarativen Pipeline. Nachdem Sie eine Spüle mit der create_sink()-Funktion erstellt haben, verwenden Sie die Spüle in einem Anfügefluss, um Daten in die Spüle zu schreiben. Der Anfügeflow ist der einzige Flowtyp, der mit der create_sink()-Funktion unterstützt wird. Andere Flusstypen, z. B. create_auto_cdc_flow, werden nicht unterstützt.
Die Delta-Senke unterstützt externe und verwaltete Unity Catalog-Tabellen sowie verwaltete Tabellen im Hive-Metastore. Tabellennamen müssen vollqualifiziert sein. Beispielsweise müssen Unity-Katalogtabellen einen dreistufigen Bezeichner verwenden: <catalog>.<schema>.<table>. Hive-Metaspeichertabellen müssen <schema>.<table>verwenden.
Hinweis
- Beim Ausführen eines vollständigen Aktualisierungsupdates werden keine Daten aus Senken gelöscht. Alle nachverarbeiteten Daten werden an die Datensenke angefügt, und vorhandene Daten werden nicht geändert.
- Erwartungen werden mit der
sinkAPI nicht unterstützt.
Syntax
from pyspark import pipelines as dp
dp.create_sink(name=<sink_name>, format=<format>, options=<options>)
Die Parameter
| Parameter | Typ | Description |
|---|---|---|
name |
str |
Erforderlich. Eine Zeichenfolge, die die Senke identifiziert und verwendet wird, um auf die Senke zu verweisen und diese zu verwalten. Sink-Namen müssen innerhalb der Pipeline eindeutig sein, auch über alle Quellcodedateien hinweg, die Teil der Pipeline sind. |
format |
str |
Erforderlich. Eine Zeichenfolge, die das Ausgabeformat definiert, entweder kafka oder delta. |
options |
dict |
Eine Liste mit Senkenoptionen im Format {"key": "value"}, wobei Schlüssel und Wert Zeichenfolgen sind. Alle von den Kafka- und Delta-Senken unterstützten Databricks Runtime-Optionen werden unterstützt.
|
Examples
from pyspark import pipelines as dp
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Create an external Delta table sink with a file path
dp.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dp.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)