Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
De create_auto_cdc_flow() functie creëert een stroom die gebruikmaakt van de declaratieve pijplijnen van Lakeflow Spark om met de functionaliteit voor change data capture (CDC) brongegevens uit een change data feed (CDF) te verwerken.
Opmerking
Deze functie vervangt de vorige functie apply_changes(). De twee functies hebben dezelfde handtekening. Databricks raadt het bijwerken aan om de nieuwe naam te gebruiken.
Belangrijk
U moet een doelstreaming tabel declareren waarvoor wijzigingen toegepast moeten worden. U kunt desgewenst het schema voor uw doeltabel opgeven. Wanneer u het schema van de create_auto_cdc_flow() doeltabel opgeeft, moet u de __START_AT en __END_AT kolommen met hetzelfde gegevenstype als de sequence_by velden opnemen.
Als u de vereiste doeltabel wilt maken, kunt u de functie create_streaming_table() gebruiken in de Python-interface van de pijplijn.
Syntaxis
from pyspark import pipelines as dp
dp.create_auto_cdc_flow(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = <bool>,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None,
name = None,
once = <bool>
)
Voor create_auto_cdc_flow verwerking is het standaardgedrag voor INSERT en UPDATE gebeurtenissen het uitvoeren van upsert CDC-gebeurtenissen uit de bron: werk alle rijen in de doeltabel bij die overeenkomen met de opgegeven sleutel(en) of voeg een nieuwe rij in wanneer er geen overeenkomende record in de doeltabel bestaat. Verwerking voor DELETE gebeurtenissen kan worden opgegeven met de apply_as_deletes parameter.
Zie de AUTO CDC-API's voor meer informatie over CDC-verwerking met een wijzigingenfeed: Het vastleggen van wijzigingsgegevens vereenvoudigen met pijplijnen. Zie create_auto_cdc_flow() voor een voorbeeld van het gebruik van de functie.
Parameterwaarden
| Kenmerk | Typologie | Description |
|---|---|---|
target |
str |
Verplicht. De naam van de tabel die moet worden bijgewerkt. U kunt de functie create_streaming_table() gebruiken om de doeltabel te maken voordat u de create_auto_cdc_flow() functie uitvoert. |
source |
str |
Verplicht. De gegevensbron met CDC-records. |
keys |
list |
Verplicht. De kolom of combinatie van kolommen waarmee een rij in de brongegevens uniek wordt geïdentificeerd. Dit wordt gebruikt om te bepalen welke CDC-gebeurtenissen van toepassing zijn op specifieke records in de doeltabel. U kunt een van de volgende opties opgeven:
|
sequence_by |
str, col() of struct() |
Verplicht. De kolomnamen die de logische volgorde van CDC-gebeurtenissen in de brongegevens opgeven. Lakeflow Spark Declarative Pipelines maakt gebruik van deze sequencing om wijzigingsevenementen af te handelen die buiten de volgorde aankomen. De opgegeven kolom moet een sorteerbaar gegevenstype zijn. U kunt een van de volgende opties opgeven:
|
ignore_null_updates |
bool |
Toestaan dat updates worden opgenomen die een subset van de doelkolommen bevatten. Wanneer een CDC-gebeurtenis overeenkomt met een bestaande rij en ignore_null_updates is True, behouden kolommen met een null hun bestaande waarden in het doelbestand. Dit geldt ook voor geneste kolommen met een waarde van null. Wanneer ignore_null_updatesFalse is, worden bestaande waarden overschreven met null waarden.De standaardwaarde is False. |
apply_as_deletes |
str of expr() |
Hiermee geeft u op wanneer een CDC-gebeurtenis moet worden behandeld als een DELETE in plaats van een upsert. U kunt een van de volgende opties opgeven:
Als u gegevens buiten volgorde wilt verwerken, wordt de verwijderde rij tijdelijk bewaard als tombstone in de onderliggende Delta-tabel en wordt er een weergave gemaakt in de metastore die deze tombstones filtert. Het retentie-interval wordt standaard ingesteld op twee dagen en kan worden geconfigureerd met de pipelines.cdc.tombstoneGCThresholdInSeconds tabeleigenschap. |
apply_as_truncates |
str of expr() |
Hiermee geeft u op wanneer een CDC-gebeurtenis moet worden behandeld als een volledige tabel TRUNCATE. U kunt een van de volgende opties opgeven:
Omdat deze clausule een volledige truncatie van de doeltabel activeert, moet deze alleen worden gebruikt voor specifieke use cases die deze functionaliteit vereisen. De apply_as_truncates parameter wordt alleen ondersteund voor SCD-type 1. SCD type 2 ondersteunt geen truncate-operaties. |
column_list of except_column_list |
list |
Een subset van kolommen om op te nemen in de doeltabel. Hiermee column_list geeft u de volledige lijst met kolommen op die u wilt opnemen. Gebruik except_column_list dit om de kolommen op te geven die moeten worden uitgesloten. U kunt een waarde declareren als een lijst met tekenreeksen of als Spark SQL-functies col() :
Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld gebruiken col(userId), maar u kunt het niet gebruiken col(source.userId). De standaardinstelling is om alle kolommen in de doeltabel op te nemen wanneer er geen column_list of except_column_list argument wordt doorgegeven aan de functie. |
stored_as_scd_type |
str of int |
Om te bepalen of records moeten worden opgeslagen als SCD-type 1 of SCD-type 2. Ingesteld op 1 voor SCD-type 1 of 2 voor SCD-type 2. De standaardwaarde is SCD type 1. |
track_history_column_list of track_history_except_column_list |
list |
Een subset van uitvoerkolommen om bij te houden voor historische doeleinden in de doeltabel. Hiermee track_history_column_list geeft u de volledige lijst met kolommen op die moeten worden bijgehouden. Hiermee track_history_except_column_list geeft u de kolommen op die moeten worden uitgesloten van het bijhouden. U kunt een waarde declareren als een lijst met tekenreeksen of als Spark SQL-functies col() :
Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld gebruiken col(userId), maar u kunt het niet gebruiken col(source.userId). De standaardinstelling is om alle kolommen in de doeltabel op te nemen wanneer er geen track_history_column_list of track_history_except_column_list argument wordt doorgegeven aan de functie. |
name |
str |
De naam van de stroom. Als deze niet is opgegeven, wordt standaard ingesteld op dezelfde waarde als target. |
once |
bool |
U kunt de stroom desgewenst definiëren als een eenmalige stroom, zoals een backfill. Door once=True te gebruiken verandert de stroom op twee manieren:
|