Not
Åtkomst till denna sida kräver auktorisation. Du kan prova att logga in eller byta katalog.
Åtkomst till denna sida kräver auktorisation. Du kan prova att byta katalog.
Viktigt!
Pipeline-API:et create_sink finns i offentlig förhandsversion.
Funktionen create_sink() skriver till en händelseströmningstjänst som Apache Kafka eller Azure Event Hubs eller till en Delta-tabell från en deklarativ pipeline. När du har skapat en mottagare med funktionen create_sink() använder du mottagaren i ett tilläggsflöde för att skriva data till mottagaren. tilläggsflöde är den enda flödestyp som stöds med funktionen create_sink(). Andra flödestyper, till exempel create_auto_cdc_flow, stöds inte.
Delta-sink stöder externa och hanterade tabeller i Unity Catalog samt hanterade tabeller i Hive-metadatalager. Namnen på tabeller måste vara fullständigt kvalificerade. Unity Catalog-tabeller måste till exempel använda en identifierare på tre nivåer: <catalog>.<schema>.<table>. Hive-metaarkivtabeller måste använda <schema>.<table>.
Anmärkning
- Om du kör en fullständig uppdatering rensas inte data från mottagare. Alla ombearbetade data läggs till i datamottagaren och befintliga data ändras inte.
- Förväntningar stöds inte med API:et
sink.
Syntax
from pyspark import pipelines as dp
dp.create_sink(name=<sink_name>, format=<format>, options=<options>)
Parameterar
| Parameter | Typ | Description |
|---|---|---|
name |
str |
Obligatoriskt. En sträng som identifierar sänkan och används för att referera till och hantera sänkan. Sinksnamn måste vara unika för pipelinen, inklusive alla källkodsfiler som är en del av pipelinen. |
format |
str |
Obligatoriskt. En sträng som definierar utdataformatet, antingen kafka eller delta. |
options |
dict |
En lista över alternativ för mottagare, formaterad som {"key": "value"}, där nyckeln och värdet är båda strängarna. Alla Databricks Runtime-alternativ som stöds av Kafka- och Delta-mottagare stöds.
|
Examples
from pyspark import pipelines as dp
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Create an external Delta table sink with a file path
dp.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dp.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)