Condividi tramite


Feed di modifiche con Apache Spark

Il connettore Spark di Azure Cosmos DB consente di elaborare il feed di modifiche su larga scala usando Apache Spark. Il connettore usa Java SDK sottostante e implementa un modello pull che distribuisce l'elaborazione in modo trasparente tra executor Spark, rendendolo ideale per scenari di elaborazione dati su larga scala.

Funzionamento del connettore Spark

Il connettore Spark per Azure Cosmos DB si basa su Azure Cosmos DB Java SDK e implementa un approccio basato sul modello pull per la lettura del feed di modifiche. Le caratteristiche principali includono:

  • Java SDK foundation: usa l'SDK Java di Azure Cosmos DB affidabile sottostante per l'elaborazione affidabile dei feed di modifiche
  • Implementazione del modello pull: segue il modello di pull del feed di modifiche , consentendo di controllare il ritmo di elaborazione
  • Elaborazione distribuita: distribuisce automaticamente l'elaborazione dei feed di modifiche tra più executor Spark per l'elaborazione parallela
  • Scalabilità trasparente: il connettore gestisce il partizionamento e la distribuzione del carico senza richiedere l'intervento manuale

Funzionalità di checkpoint univoca

Uno dei vantaggi principali dell'uso del connettore Spark per l'elaborazione dei feed di modifiche è il meccanismo di checkpoint predefinito. Questa funzione fornisce:

  • Ripristino automatico: meccanismo predefinito per il ripristino durante l'elaborazione del feed di modifiche su larga scala
  • Tolleranza di errore: possibilità di riprendere l'elaborazione dall'ultimo checkpoint in caso di errori
  • Gestione dello stato: mantiene lo stato di elaborazione tra sessioni Spark e riavvii del cluster
  • Scalabilità: supporta il checkpoint in ambienti Spark distribuiti

Questa funzionalità di checkpoint è unica per il connettore Spark e non è disponibile quando si usano direttamente gli SDK, rendendola particolarmente utile per gli scenari di produzione che richiedono disponibilità elevata e affidabilità.

Avvertimento

La spark.cosmos.changeFeed.startFrom configurazione viene ignorata se nella posizione del checkpoint sono presenti segnalibri esistenti. Quando si riprende da un checkpoint, il connettore continuerà dall'ultima posizione elaborata anziché dal punto iniziale specificato.

Quando usare Spark per l'elaborazione del feed di modifiche

Prendere in considerazione l'uso del connettore Spark per l'elaborazione dei feed di modifiche in questi scenari:

  • Elaborazione dei dati su larga scala: quando è necessario elaborare volumi elevati di dati del feed di modifiche che superano le funzionalità a computer singolo
  • Trasformazioni complesse: quando l'elaborazione del feed di modifiche comporta trasformazioni, aggregazioni o join complessi di dati con altri set di dati
  • Analisi distribuita: quando è necessario eseguire analisi in tempo reale o quasi in tempo reale sui dati dei feed di modifiche in un ambiente distribuito
  • Integrazione con le pipeline di dati: quando l'elaborazione del feed di modifiche fa parte di pipeline ETL/ELT più grandi che usano già Spark
  • Requisiti di tolleranza di errore: quando sono necessari meccanismi di checkpoint e ripristino affidabili per i carichi di lavoro di produzione
  • Elaborazione di più contenitori: quando è necessario elaborare i feed di modifiche da più contenitori contemporaneamente

Per scenari più semplici o quando è necessario un controllo granulare sull'elaborazione dei singoli documenti, è consigliabile usare il processore del feed di modifiche o il modello pull direttamente con gli SDK.

Esempi di codice

Gli esempi seguenti illustrano come leggere dal feed di modifiche usando il connettore Spark. Per esempi più completi, vedere i notebook di esempio completi:

# Configure change feed reading

changeFeedConfig = {
    "spark.cosmos.accountEndpoint": "https://<account-name>.documents.azure.com:443/",
    "spark.cosmos.accountKey": "<account-key>",
    "spark.cosmos.database": "<database-name>",
    "spark.cosmos.container": "<container-name>",
    # Start from beginning, now, or specific timestamp (ignored if checkpoints exist)
    "spark.cosmos.changeFeed.startFrom": "Beginning",  # "Now" or "2020-02-10T14:15:03"
    "spark.cosmos.changeFeed.mode": "LatestVersion",  # or "AllVersionsAndDeletes"
    # Control batch size - if not set, all available data processed in first batch
    "spark.cosmos.changeFeed.itemCountPerTriggerHint": "50000",
    "spark.cosmos.read.partitioning.strategy": "Restrictive"
}

# Read change feed as a streaming DataFrame
changeFeedDF = spark \
    .readStream \
    .format("cosmos.oltp.changeFeed") \
    .options(**changeFeedConfig) \
    .load()

# Configure output settings with checkpointing
outputConfig = {
    "spark.cosmos.accountEndpoint": "https://<target-account>.documents.azure.com:443/",
    "spark.cosmos.accountKey": "<target-account-key>",
    "spark.cosmos.database": "<target-database>",
    "spark.cosmos.container": "<target-container>",
    "spark.cosmos.write.strategy": "ItemOverwrite"
}

# Process and write the change feed data with checkpointing
query = changeFeedDF \
    .selectExpr("*") \
    .writeStream \
    .format("cosmos.oltp") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/changefeed-checkpoint") \
    .options(**outputConfig) \
    .start()

# Wait for the streaming query to finish
query.awaitTermination()

Opzioni di configurazione chiave

Quando si usa il feed di modifiche in Spark, queste opzioni di configurazione sono particolarmente importanti:

  • spark.cosmos.changeFeed.startFrom: controlla dove iniziare a leggere il feed di modifiche
    • "Beginning" - Iniziare dall'inizio del feed di modifiche
    • "Now" - Inizia dall'ora corrente
    • "2020-02-10T14:15:03" - Inizia da un timestamp specifico (formato ISO 8601)
    • Nota: questa impostazione viene ignorata se sono presenti segnalibri esistenti nel percorso del checkpoint
  • spark.cosmos.changeFeed.mode: specifica la modalità feed di modifiche
    • "LatestVersion" - Elaborare solo la versione più recente dei documenti modificati
    • "AllVersionsAndDeletes" - Elaborare tutte le versioni delle modifiche, incluse le eliminazioni
  • spark.cosmos.changeFeed.itemCountPerTriggerHint: controlla le dimensioni di elaborazione batch
    • Numero massimo approssimativo di elementi letti dal feed di modifiche per ogni micro-batch/trigger
    • Esempio: "50000"
    • Importante: se non è impostato, tutti i dati disponibili nel feed di modifiche verranno elaborati nel primo micro-batch
  • checkpointLocation: specifica dove archiviare le informazioni del checkpoint per la tolleranza di errore e il ripristino
  • spark.cosmos.read.partitioning.strategy: controlla la modalità di partizionamento dei dati tra executor Spark

Passaggi successivi