Condividi tramite


Sovrascrivere in modo selettivo i dati con Delta Lake

Azure Databricks sfrutta la funzionalità Delta Lake per supportare due opzioni distinte per sovrascrizioni selettive:

  • L'opzione replaceWhere sostituisce atomicamente tutti i record che corrispondono a un predicato specificato.
  • È possibile sostituire le directory dei dati in base al modo in cui le tabelle vengono partizionate usando sovrascrizioni delle partizioni dinamiche.

Per la maggior parte delle operazioni, Databricks consiglia di usare replaceWhere per specificare i dati da sovrascrivere.

Important

Se i dati sono stati accidentalmente sovrascritti, è possibile usare il ripristino per annullare la modifica.

Sovrascrittura selettiva arbitraria con replaceWhere

È possibile sovrascrivere in modo selettivo solo i dati corrispondenti a un'espressione arbitraria.

Note

SQL richiede Databricks Runtime 12.2 LTS o versione successiva.

Il comando seguente sostituisce in modo atomico gli eventi di gennaio nella tabella di destinazione, partizionata da start_date, con i dati in replace_data:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Questo codice di esempio scrive i dati in replace_data, convalida che tutte le righe corrispondano al predicato ed esegue una sostituzione atomica usando overwrite la semantica. Se i valori nell'operazione non rientrano nel vincolo, l'operazione ha esito negativo e viene generato un errore per impostazione predefinita.

È possibile modificare questo comportamento impostando overwrite i valori all'interno dell'intervallo di predicati e insert dei record che non rientrano nell'intervallo specificato. A tale scopo, disabilitare il controllo del vincolo impostando su spark.databricks.delta.replaceWhere.constraintCheck.enabled false usando una delle impostazioni seguenti:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Comportamento legacy

Il comportamento predefinito legacy ha replaceWhere sovrascritto i dati corrispondenti a un predicato solo sulle colonne di partizione. Con questo modello legacy, il comando seguente sostituisce in modo atomico il mese di gennaio nella tabella di destinazione, partizionato da date, con i dati in df:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")

Se si vuole eseguire il fallback al comportamento precedente, è possibile disabilitare il spark.databricks.delta.replaceWhere.dataColumns.enabled flag:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Sovrascrive la partizione dinamica

La sovrascrittura dinamica delle partizioni aggiorna solo le partizioni in cui la scrittura comporta il commit di nuovi dati. Sovrascrive tutti i dati esistenti in tali partizioni e lascia invariati altri.

Azure Databricks supporta due approcci:

  • REPLACE USING (scelta consigliata): funziona in tutti i tipi di calcolo, inclusi databricks SQL warehouse, calcolo serverless e calcolo classico. Non richiede l'impostazione di una configurazione di sessione Spark.
  • partitionOverwriteMode (legacy) - Richiede il calcolo classico e l'impostazione di una configurazione di sessione Spark. Non supportato nel calcolo SQL o serverless di Databricks.

Le sezioni seguenti illustrano come usare ogni approccio.

Sovrascrittura dinamica delle partizioni con REPLACE USING

Databricks Runtime 16.3 e versioni successive supporta sovrascrizioni di partizione dinamiche per le tabelle partizionate usando REPLACE USING. Questo metodo consente di sovrascrivere in modo selettivo i dati in tutti i tipi di calcolo, senza dover impostare una configurazione di sessione Spark. REPLACE USING abilita il comportamento di sovrascrittura atomica indipendente dal calcolo che funziona su Databricks SQL Warehouse, calcolo serverless e classico.

REPLACE USING sovrascrive solo le partizioni di destinazione dei dati in ingresso. Tutte le altre partizioni rimangono invariate.

Nell'esempio seguente viene illustrato come usare la sovrascrittura della partizione dinamica con REPLACE USING. Attualmente, è possibile usare solo SQL, non Python o Scala. Per informazioni dettagliate, vedere INSERT le informazioni di riferimento sul linguaggio SQL.

INSERT INTO TABLE events
  REPLACE USING (event_id, start_date)
  SELECT * FROM source_data

Tenere presenti i vincoli e i comportamenti seguenti per sovrascrivere le partizioni dinamiche:

  • È necessario specificare il set completo delle colonne di partizione della tabella nella USING clausola .
  • Verificare sempre che i dati scritti tocchino solo le partizioni previste. Una singola riga nella partizione errata può sovrascrivere involontariamente l'intera partizione.

Se hai bisogno di una logica di corrispondenza più personalizzabile rispetto a quella supportata da REPLACE USING, ad esempio trattando i valori di NULL come uguali, utilizza invece la logica complementare REPLACE ON. Per informazioni dettagliate, vedere INSERT.

Sovrascrittura della partizione dinamica con partitionOverwriteMode (legacy)

Important

Questa funzionalità è in Anteprima Pubblica.

Databricks Runtime 11.3 LTS e versioni successive supportano le sovrascrizioni delle partizioni dinamiche per le tabelle partizionate usando la modalità di sovrascrittura: INSERT OVERWRITE in SQL o una scrittura del DataFrame con df.write.mode("overwrite"). Questo tipo di sovrascrittura è disponibile solo per il calcolo classico, non per databricks SQL warehouse o per il calcolo serverless.

Configurare la modalità di sovrascrittura della partizione dinamica impostando la configurazione spark.sql.sources.partitionOverwriteMode della sessione Spark su dynamic. In alternativa, è possibile impostare l'opzione DataFrameWriterpartitionOverwriteMode su dynamic. Se presente, l'opzione specifica della query sostituisce la modalità definita nella configurazione della sessione. L'impostazione predefinita per spark.sql.sources.partitionOverwriteMode è static.

L'esempio seguente illustra l'uso di partitionOverwriteMode:

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Tenere presenti i vincoli e i comportamenti seguenti per partitionOverwriteMode:

  • Non è possibile impostare overwriteSchema su true.
  • Non è possibile specificare sia partitionOverwriteMode che replaceWhere nella stessa operazione DataFrameWriter.
  • Se si specifica una replaceWhere condizione usando un'opzione DataFrameWriter , Delta Lake applica tale condizione per controllare quali dati vengono sovrascritti. Questa opzione ha la precedenza sulla partitionOverwriteMode configurazione a livello di sessione.
  • Verificare sempre che i dati scritti tocchino solo le partizioni previste. Una singola riga nella partizione errata può sovrascrivere involontariamente l'intera partizione.