Sovrascrivere in modo selettivo i dati con Delta Lake

Azure Databricks sfrutta la funzionalità Delta Lake per supportare due opzioni distinte per sovrascrizioni selettive:

  • L'opzione replaceWhere sostituisce atomicamente tutti i record che corrispondono a un predicato specificato.
  • È possibile sostituire le directory dei dati in base al modo in cui le tabelle vengono partizionate usando sovrascrizioni delle partizioni dinamiche.

Per la maggior parte delle operazioni, Databricks consiglia di usare replaceWhere per specificare i dati da sovrascrivere.

Importante

Se i dati sono stati accidentalmente sovrascritti, è possibile usare il ripristino per annullare la modifica.

Sovrascrittura selettiva arbitraria con replaceWhere

È possibile sovrascrivere in modo selettivo solo i dati corrispondenti a un'espressione arbitraria.

Nota

SQL richiede Databricks Runtime 12.2 LTS o versione successiva.

Il comando seguente sostituisce in modo atomico gli eventi di gennaio nella tabella di destinazione, partizionata da start_date, con i dati in replace_data:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Questo codice di esempio scrive i dati in replace_data, convalida che tutte le righe corrispondano al predicato ed esegue una sostituzione atomica usando overwrite la semantica. Se i valori nell'operazione non rientrano nel vincolo, l'operazione ha esito negativo e viene generato un errore per impostazione predefinita.

È possibile modificare questo comportamento impostando overwrite i valori all'interno dell'intervallo di predicati e insert dei record che non rientrano nell'intervallo specificato. A tale scopo, disabilitare il controllo del vincolo impostando su spark.databricks.delta.replaceWhere.constraintCheck.enabled false usando una delle impostazioni seguenti:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Comportamento legacy

Il comportamento predefinito legacy ha replaceWhere sovrascritto i dati corrispondenti a un predicato solo sulle colonne di partizione. Con questo modello legacy, il comando seguente sostituisce in modo atomico il mese di gennaio nella tabella di destinazione, partizionato da date, con i dati in df:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")

Se si vuole eseguire il fallback al comportamento precedente, è possibile disabilitare il spark.databricks.delta.replaceWhere.dataColumns.enabled flag:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Sovrascrive la partizione dinamica

Importante

Questa funzionalità è disponibile in anteprima pubblica.

Databricks Runtime 11.3 LTS e versioni successive supporta la modalità di sovrascrittura della partizione dinamica per le tabelle partizionate. Per le tabelle con più partizioni, Databricks Runtime 11.3 LTS e versioni successive supportano solo la sovrascrittura della partizione dinamica se tutte le colonne di partizione sono dello stesso tipo di dati.

Quando si usa la modalità di sovrascrittura della partizione dinamica, le operazioni sovrascrivono tutti i dati esistenti in ogni partizione logica per cui la scrittura esegue il commit di nuovi dati. Tutte le partizioni logiche esistenti per le quali la scrittura non contiene dati rimangono invariati. Questa modalità è applicabile solo quando i dati vengono scritti in modalità di sovrascrittura: INSERT OVERWRITE in SQL o in una scrittura di dataframe con df.write.mode("overwrite").

Configurare la modalità di sovrascrittura della partizione dinamica impostando la configurazione spark.sql.sources.partitionOverwriteMode della sessione Spark su dynamic. È anche possibile abilitare questa opzione impostando l'opzione DataFrameWriterpartitionOverwriteMode su dynamic. Se presente, l'opzione specifica della query sostituisce la modalità definita nella configurazione della sessione. L'impostazione predefinita per partitionOverwriteMode è static.

Importante

Verificare che i dati scritti con la sovrascrittura della partizione dinamica tocchino solo le partizioni previste. Una singola riga nella partizione non corretta può causare la sovrascrittura involontaria di un'intera partizione.

L'esempio seguente illustra l'uso di sovrascrizioni di partizione dinamica:

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Nota

  • La sovrascrittura della partizione dinamica è in conflitto con l'opzione replaceWhere per le tabelle partizionate.
    • Se la sovrascrittura della partizione dinamica è abilitata nella configurazione della sessione Spark e replaceWhere viene fornita come DataFrameWriter opzione, Delta Lake sovrascrive i dati in base all'espressione replaceWhere (le opzioni specifiche della query sostituiscono le configurazioni della sessione).
    • Viene visualizzato un errore se le opzioni includono sia la sovrascrittura della partizione dinamica che replaceWhere l'abilitazioneDataFrameWriter.
  • Non è possibile specificare overwriteSchema come true quando si usa la sovrascrittura della partizione dinamica.