Delen via


create_auto_cdc_from_snapshot_flow

Belangrijk

Deze functionaliteit bevindt zich in openbare preview.

Met de create_auto_cdc_from_snapshot_flow functie wordt een stroom gecreëerd die gebruikmaakt van de Lakeflow Spark declaratieve pijplijnen en de functionaliteit voor het vastleggen van gegevenswijzigingen (CDC) om brondata van momentopnamen van databases te verwerken. Zie Hoe wordt CDC geïmplementeerd met de AUTO CDC FROM SNAPSHOT API?

Opmerking

Deze functie vervangt de vorige functie apply_changes_from_snapshot(). De twee functies hebben dezelfde handtekening. Databricks raadt het bijwerken aan om de nieuwe naam te gebruiken.

Belangrijk

Voor deze bewerking moet u een gerichte streamingstabel hebben.

Als u de vereiste doeltabel wilt maken, kunt u de functie create_streaming_table() gebruiken.

Syntaxis

from pyspark import pipelines as dp

dp.create_auto_cdc_from_snapshot_flow(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Opmerking

Voor AUTO CDC FROM SNAPSHOT verwerking is het standaardgedrag het invoegen van een nieuwe rij wanneer een overeenkomende record met dezelfde sleutel(en) niet in het doel bestaat. Als er wel een overeenkomende record bestaat, wordt deze alleen bijgewerkt als een van de waarden in de rij is gewijzigd. Rijen met sleutels die aanwezig zijn in de doeldatabase, maar die niet meer aanwezig zijn in de brondatabase, worden verwijderd.

Zie de AUTO CDC-API's voor meer informatie over CDC-verwerking met momentopnamen: Het vastleggen van wijzigingsgegevens vereenvoudigen met pijplijnen. Zie de voorbeelden van periodieke create_auto_cdc_from_snapshot_flow() en historische momentopnameopnamen voor voorbeelden van het gebruik van de functie.

Parameterwaarden

Kenmerk Typologie Description
target str Verplicht. De naam van de tabel die moet worden bijgewerkt. U kunt de functie create_streaming_table() gebruiken om de doeltabel te maken voordat u de create_auto_cdc_from_snapshot_flow() functie uitvoert.
source str of lambda function Verplicht. Ofwel de naam van een tabel of weergave om periodiek een momentopname te maken of een Python-lambda-functie die de dataframe voor momentopnamen retourneert die moet worden verwerkt en de versie van de momentopname. Zie Het argument implementerensource.
keys list Verplicht. De kolom of combinatie van kolommen waarmee een rij in de brongegevens uniek wordt geïdentificeerd. Dit wordt gebruikt om te bepalen welke CDC-gebeurtenissen van toepassing zijn op specifieke records in de doeltabel. U kunt een van de volgende opties opgeven:
  • Een lijst met tekenreeksen: ["userId", "orderId"]
  • Een lijst met Spark SQL-functies col() : [col("userId"), col("orderId"]. Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld gebruiken col(userId), maar u kunt het niet gebruiken col(source.userId).
stored_as_scd_type str of int Om te bepalen of records moeten worden opgeslagen als SCD-type 1 of SCD-type 2. Ingesteld op 1 voor SCD-type 1 of 2 voor SCD-type 2. De standaardwaarde is SCD type 1.
track_history_column_list of track_history_except_column_list list Een subset van uitvoerkolommen om bij te houden voor historische doeleinden in de doeltabel. Hiermee track_history_column_list geeft u de volledige lijst met kolommen op die moeten worden bijgehouden. Hiermee track_history_except_column_list geeft u de kolommen op die moeten worden uitgesloten van het bijhouden. U kunt een waarde declareren als een lijst met tekenreeksen of als Spark SQL-functies col() :
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumenten voor col() functies kunnen geen kwalificaties bevatten. U kunt bijvoorbeeld gebruiken col(userId), maar u kunt het niet gebruiken col(source.userId). De standaardinstelling is om alle kolommen in de doeltabel op te nemen wanneer er geen track_history_column_list of track_history_except_column_list argument wordt doorgegeven aan de functie.

source Het argument implementeren

De create_auto_cdc_from_snapshot_flow() functie bevat het source argument. Voor het verwerken van historische momentopnamen wordt verwacht dat het source argument een Python-lambda-functie is die twee waarden retourneert aan de create_auto_cdc_from_snapshot_flow() functie: een Python DataFrame met de momentopnamegegevens die moeten worden verwerkt en een momentopnameversie.

Hier volgt de handtekening van de lambda-functie:

lambda Any => Optional[(DataFrame, Any)]
  • Het argument voor de lambda-functie is de laatst verwerkte momentopnameversie.
  • De retourwaarde van de lambda-functie is None of een tuple van twee waarden: de eerste waarde van de tuple is een DataFrame dat de momentopname bevat die moet worden verwerkt. De tweede waarde van de tuple is de momentopnameversie die de logische volgorde van de momentopname aangeeft.

Een voorbeeld waarmee de lambda-functie wordt geïmplementeerd en aangeroepen:

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

create_auto_cdc_from_snapshot_flow(
  # ...
  source = next_snapshot_and_version,
  # ...
)

De declaratieve pijplijnruntime van Lakeflow Spark voert de volgende stappen uit telkens wanneer de pijplijn met de create_auto_cdc_from_snapshot_flow() functie wordt geactiveerd:

  1. Hiermee wordt de next_snapshot_and_version functie uitgevoerd om het volgende dataframe voor momentopnamen en de bijbehorende momentopnameversie te laden.
  2. Als er geen DataFrame wordt geretourneerd, wordt de uitvoering beëindigd en wordt de pijplijnupdate gemarkeerd als voltooid.
  3. Detecteert de wijzigingen in de nieuwe momentopname en past deze incrementeel toe op de doeltabel.
  4. Keert terug naar stap 1 om de volgende momentopname en de bijbehorende versie te laden.