Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Belangrijk
Deze functionaliteit bevindt zich in openbare preview.
Met de create_auto_cdc_from_snapshot_flow functie wordt een stroom gecreëerd die gebruikmaakt van de Lakeflow Spark declaratieve pijplijnen en de functionaliteit voor het vastleggen van gegevenswijzigingen (CDC) om brondata van momentopnamen van databases te verwerken. Zie Hoe wordt CDC geïmplementeerd met de AUTO CDC FROM SNAPSHOT API?
Opmerking
Deze functie vervangt de vorige functie apply_changes_from_snapshot(). De twee functies hebben dezelfde handtekening. Databricks raadt het bijwerken aan om de nieuwe naam te gebruiken.
Belangrijk
Voor deze bewerking moet u een gerichte streamingstabel hebben.
Als u de vereiste doeltabel wilt maken, kunt u de functie create_streaming_table() gebruiken.
Syntaxis
from pyspark import pipelines as dp
dp.create_auto_cdc_from_snapshot_flow(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Opmerking
Voor AUTO CDC FROM SNAPSHOT verwerking is het standaardgedrag het invoegen van een nieuwe rij wanneer een overeenkomende record met dezelfde sleutel(en) niet in het doel bestaat. Als er wel een overeenkomende record bestaat, wordt deze alleen bijgewerkt als een van de waarden in de rij is gewijzigd. Rijen met sleutels die aanwezig zijn in de doeldatabase, maar die niet meer aanwezig zijn in de brondatabase, worden verwijderd.
Zie de AUTO CDC-API's voor meer informatie over CDC-verwerking met momentopnamen: Het vastleggen van wijzigingsgegevens vereenvoudigen met pijplijnen. Zie de voorbeelden van periodieke create_auto_cdc_from_snapshot_flow() en historische momentopnameopnamen voor voorbeelden van het gebruik van de functie.
Parameterwaarden
| Kenmerk | Typologie | Description |
|---|---|---|
target |
str |
Verplicht. De naam van de tabel die moet worden bijgewerkt. U kunt de functie create_streaming_table() gebruiken om de doeltabel te maken voordat u de create_auto_cdc_from_snapshot_flow() functie uitvoert. |
source |
str of lambda function |
Verplicht. Ofwel de naam van een tabel of weergave om periodiek een momentopname te maken of een Python-lambda-functie die de dataframe voor momentopnamen retourneert die moet worden verwerkt en de versie van de momentopname. Zie Het argument implementerensource. |
keys |
list |
Verplicht. De kolom of combinatie van kolommen waarmee een rij in de brongegevens uniek wordt geïdentificeerd. Dit wordt gebruikt om te bepalen welke CDC-gebeurtenissen van toepassing zijn op specifieke records in de doeltabel. U kunt een van de volgende opties opgeven:
|
stored_as_scd_type |
str of int |
Om te bepalen of records moeten worden opgeslagen als SCD-type 1 of SCD-type 2. Ingesteld op 1 voor SCD-type 1 of 2 voor SCD-type 2. De standaardwaarde is SCD type 1. |
track_history_column_list of track_history_except_column_list |
list |
Een subset van uitvoerkolommen om bij te houden voor historische doeleinden in de doeltabel. Hiermee track_history_column_list geeft u de volledige lijst met kolommen op die moeten worden bijgehouden. Hiermee track_history_except_column_list geeft u de kolommen op die moeten worden uitgesloten van het bijhouden. U kunt een waarde declareren als een lijst met tekenreeksen of als Spark SQL-functies col() :
Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld gebruiken col(userId), maar u kunt het niet gebruiken col(source.userId). De standaardinstelling is om alle kolommen in de doeltabel op te nemen wanneer er geen track_history_column_list of track_history_except_column_list argument wordt doorgegeven aan de functie. |
source Het argument implementeren
De create_auto_cdc_from_snapshot_flow() functie bevat het source argument. Voor het verwerken van historische momentopnamen wordt verwacht dat het source argument een Python-lambda-functie is die twee waarden retourneert aan de create_auto_cdc_from_snapshot_flow() functie: een Python DataFrame met de momentopnamegegevens die moeten worden verwerkt en een momentopnameversie.
Hier volgt de handtekening van de lambda-functie:
lambda Any => Optional[(DataFrame, Any)]
- Het argument voor de lambda-functie is de laatst verwerkte momentopnameversie.
- De retourwaarde van de lambda-functie is
Noneof een tuple van twee waarden: de eerste waarde van de tuple is een DataFrame dat de momentopname bevat die moet worden verwerkt. De tweede waarde van de tuple is de momentopnameversie die de logische volgorde van de momentopname aangeeft.
Een voorbeeld waarmee de lambda-functie wordt geïmplementeerd en aangeroepen:
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
create_auto_cdc_from_snapshot_flow(
# ...
source = next_snapshot_and_version,
# ...
)
De declaratieve pijplijnruntime van Lakeflow Spark voert de volgende stappen uit telkens wanneer de pijplijn met de create_auto_cdc_from_snapshot_flow() functie wordt geactiveerd:
- Hiermee wordt de
next_snapshot_and_versionfunctie uitgevoerd om het volgende dataframe voor momentopnamen en de bijbehorende momentopnameversie te laden. - Als er geen DataFrame wordt geretourneerd, wordt de uitvoering beëindigd en wordt de pijplijnupdate gemarkeerd als voltooid.
- Detecteert de wijzigingen in de nieuwe momentopname en past deze incrementeel toe op de doeltabel.
- Keert terug naar stap 1 om de volgende momentopname en de bijbehorende versie te laden.