Anmerkung
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen, dich anzumelden oder die Verzeichnisse zu wechseln.
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen , die Verzeichnisse zu wechseln.
Von Bedeutung
Die Pipeline-API create_sink befindet sich in der öffentlichen Vorschau.
Die create_sink() Funktion schreibt in einen Ereignisstreamingdienst wie Apache Kafka oder Azure Event Hubs oder in eine Delta-Tabelle aus einer deklarativen Pipeline. Nachdem Sie eine Spüle mit der create_sink()-Funktion erstellt haben, verwenden Sie die Spüle in einem Anfügefluss, um Daten in die Spüle zu schreiben. Der Anfügeflow ist der einzige Flowtyp, der mit der create_sink()-Funktion unterstützt wird. Andere Flusstypen, z. B. create_auto_cdc_flow, werden nicht unterstützt.
Die Delta-Senke unterstützt externe und verwaltete Unity Catalog-Tabellen sowie verwaltete Tabellen im Hive-Metastore. Tabellennamen müssen vollqualifiziert sein. Beispielsweise müssen Unity-Katalogtabellen einen dreistufigen Bezeichner verwenden: <catalog>.<schema>.<table>. Hive-Metaspeichertabellen müssen <schema>.<table>verwenden.
Hinweis
- Beim Ausführen eines vollständigen Aktualisierungsupdates werden keine Daten aus Senken gelöscht. Alle nachverarbeiteten Daten werden an die Datensenke angefügt, und vorhandene Daten werden nicht geändert.
- Erwartungen werden mit der
sinkAPI nicht unterstützt.
Syntax
from pyspark import pipelines as dp
dp.create_sink(name=<sink_name>, format=<format>, options=<options>)
Die Parameter
| Parameter | Typ | Description |
|---|---|---|
name |
str |
Erforderlich. Eine Zeichenfolge, die die Senke identifiziert und verwendet wird, um auf die Senke zu verweisen und diese zu verwalten. Sink-Namen müssen innerhalb der Pipeline eindeutig sein, auch über alle Quellcodedateien hinweg, die Teil der Pipeline sind. |
format |
str |
Erforderlich. Eine Zeichenfolge, die das Ausgabeformat definiert, entweder kafka oder delta. |
options |
dict |
Eine Liste mit Senkenoptionen im Format {"key": "value"}, wobei Schlüssel und Wert Zeichenfolgen sind. Alle von den Kafka- und Delta-Senken unterstützten Databricks Runtime-Optionen werden unterstützt.
|
Examples
from pyspark import pipelines as dp
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Create an external Delta table sink with a file path
dp.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dp.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)