Freigeben über


Arbeiten mit Tabellenverlauf

Jeder Vorgang, der eine Tabelle ändert, erstellt eine neue Tabellenversion. Verwenden Sie Verlaufsinformationen zum Überwachen von Vorgängen, zum Rollback einer Tabelle oder zum Abfragen einer Tabelle zu einem bestimmten Zeitpunkt mithilfe von Zeitreisen.

Hinweis

Databricks empfiehlt die Verwendung des Tabellenverlaufs nicht als langfristige Sicherungslösung für die Datenarchivierung. Verwenden Sie nur die letzten 7 Tage für Zeitreisevorgänge, es sei denn, Sie haben sowohl Daten- als auch Protokollaufbewahrungskonfigurationen auf einen größeren Wert festgelegt.

Tabellenverlauf abrufen

Rufen Sie Informationen einschließlich der Vorgänge, des Benutzers und des Zeitstempels für jeden Schreibvorgang in eine Tabelle ab, indem Sie den history Befehl ausführen. Die Vorgänge werden in umgekehrter chronologischer Reihenfolge zurückgegeben.

Die Beibehaltung des Tabellenverlaufs wird durch die Tabelleneinstellung logRetentionDurationbestimmt, die standardmäßig auf 30 Tage festgelegt ist.

Hinweis

Zeitreisen und Tabellenverlauf werden durch unterschiedliche Aufbewahrungsschwellenwerte gesteuert. Sehen Sie sich an, was ist Zeitreise?.

DESCRIBE HISTORY table_name       -- get the full history of the table

DESCRIBE HISTORY table_name LIMIT 1  -- get the last operation only

Details zur Spark SQL-Syntax finden Sie unter DESCRIBE HISTORY.

Details zur Syntax von Scala, Java und Python finden Sie in der Dokumentation zur Delta Lake-API.

Der Katalog-Explorer bietet eine visuelle Ansicht dieser detaillierten Tabelleninformationen und des Verlaufs. Sie können auf die Registerkarte Verlauf klicken, um zusätzlich zu Tabellenschema und Beispieldaten den mit DESCRIBE HISTORY angezeigten Tabellenverlauf zu sehen.

Verlaufsschema

Die Ausgabe des Vorgangs history enthält die folgenden Spalten.

Kolumne Typ BESCHREIBUNG
Ausgabe lang Die vom Vorgang generierte Tabellenversion.
Zeitstempel Zeitstempel Zeitpunkt, zu dem diese Version committet wurde.
Benutzer-ID Zeichenfolge ID des Benutzers, der den Vorgang ausgeführt hat.
Nutzername Zeichenfolge Name des Benutzers, der den Vorgang ausgeführt hat.
Vorgang Zeichenfolge Name des Vorgangs.
Betriebsparameter Karte Parameter des Vorgangs (z. B. Prädikate)
Auftrag Struktur Details des Auftrags, der den Vorgang ausgeführt hat.
Notebook Struktur Details des Notebooks, aus dem der Vorgang ausgeführt wurde.
clusterId Zeichenfolge ID des Clusters, auf dem der Vorgang ausgeführt wurde.
Version lesen lang Version der Tabelle, die gelesen wurde, um den Schreibvorgang auszuführen.
Isolationsebene Zeichenfolge Für diesen Vorgang verwendete Isolationsstufe.
isBlindAppend Boolescher Wert Gibt an, ob dieser Vorgang Daten angefügt hat.
operationMetrics Karte Metriken des Vorgangs (z. B. Anzahl von geänderten Zeilen und Dateien)
UserMetadata Zeichenfolge Benutzerdefinierte Commit-Metadaten, wenn sie angegeben wurden
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Hinweis

Verständnis partitionBy für Betriebsparameter

Das partitionBy Feld ist nur für CREATE- und OVERWRITE-Vorgänge sinnvoll, die das Partitionsschema einer Tabelle definieren oder ändern.

Bei Anfügevorgängen an vorhandene Tabellen (APPEND, INSERT, , UPDATEDELETE, MERGE) kann in diesem Feld je nach verwendeter Schreibmethode ([] vs .save()) ein leeres Array .saveAsTable() oder Partitionsspalten angezeigt werden. Diese Inkonsistenz ist ein erwartetes Verhalten und sollte nicht zum Überprüfen von Schreibvorgängen verwendet werden.

Wichtig

Verlassen Sie sich nicht auf den Verlauf von partitionBy, um Anfügevorgänge zu validieren. Der Wert variiert je nach Implementierungsdetails, wirkt sich jedoch nicht darauf aus, wie Daten in Partitionen geschrieben werden.

Example

Betrachten Sie eine Tabelle, die von der date Spalte partitioniert wird:

# Initial table creation - partitionBy is populated
df.write.format("delta") \
  .partitionBy("date") \
  .saveAsTable("sales_data")

Der CREATE-Vorgang im Verlauf zeigt Folgendes:

operationParameters: {
  "mode": "ErrorIfExists",
  "partitionBy": "[\"date\"]"
}

Wenn Sie Daten an diese Tabelle anfügen:

# Subsequent append - partitionBy shows empty
new_df.write.format("delta") \
  .mode("append") \
  .saveAsTable("sales_data")

Der APPEND-Vorgang zeigt:

operationParameters: {
  "mode": "Append",
  "partitionBy": "[]"
}

Es wird erwartet, dass der partitionBy Wert leer ist. Die Daten werden weiterhin basierend auf dem vorhandenen Partitionsschema der Tabelle in die richtigen Partitionen geschrieben. Beachten Sie, dass bei einem Pfad Partitionsspalten in diesem Feld angezeigt werden, aber dieser Unterschied ist ein Implementierungsdetail und wirkt sich nicht auf das Schreibverhalten aus.

Vorgangsmetriken

Der Vorgang history gibt eine Auflistung von Vorgangsmetriken in der Spaltenzuordnung operationMetrics zurück.

In den folgenden Tabellen sind die Zuordnungsschlüsseldefinitionen nach Vorgang aufgeführt.

Vorgang Metrikname BESCHREIBUNG
SCHREIBEN, CREATE TABLE ALS SELECT, ERSETZEN TABLE ALS SELECT, COPY INTO
numFiles Anzahl von geschriebenen Dateien.
numOutputBytes Größe der geschriebenen Inhalte in Bytes.
numOutputRows Anzahl von geschriebenen Zeilen.
STRÖMEND UPDATE
Anzahl hinzugefügter Dateien Anzahl von hinzugefügten Dateien.
AnzahlGelöschterDateien Anzahl von entfernten Dateien.
numOutputRows Anzahl von geschriebenen Zeilen.
numOutputBytes Größe des Schreibzugriffs in Bytes.
Löschen
Anzahl hinzugefügter Dateien Anzahl von hinzugefügten Dateien. Wird nicht angegeben, wenn Partitionen der Tabelle gelöscht werden.
AnzahlGelöschterDateien Anzahl von entfernten Dateien.
numDeletedRows Anzahl von entfernten Zeilen. Wird nicht angegeben, wenn Partitionen der Tabelle gelöscht werden.
AnzahlKopierteZeilen Anzahl der beim Löschen von Dateien kopierten Zeilen.
AusführungszeitMs Benötigte Zeit für die Ausführung des gesamten Vorgangs.
Scan-Zeit in ms Benötigte Zeit zum Überprüfen der Dateien auf Übereinstimmungen.
UmschreibzeitMs Benötigte Zeit zum erneuten Schreiben der übereinstimmenden Dateien.
ABSCHNEIDEN
AnzahlGelöschterDateien Anzahl von entfernten Dateien.
AusführungszeitMs Benötigte Zeit für die Ausführung des gesamten Vorgangs.
VERSCHMELZEN
numSourceRows Anzahl von Zeilen im Quelldatenrahmen (Quell-DataFrame).
numTargetRowsInserted Anzahl der in die Zieltabelle eingefügten Zeilen.
AnzahlZielzeilenAktualisiert Anzahl der in der Zieltabelle aktualisierten Zeilen.
numTargetRowsDeleted Anzahl der in der Zieltabelle gelöschten Zeilen.
AnzahlZielzeilenKopiert Anzahl von kopierten Zielzeilen.
numOutputRows Gesamtanzahl von ausgeschriebenen Zeilen.
AnzahlZieldateienHinzugefügt Anzahl von Dateien, die der Senke (Ziel) hinzugefügt wurden.
AnzahlZielDateienEntfernt Anzahl von Dateien, die aus der Senke (Ziel) entfernt wurden.
AusführungszeitMs Benötigte Zeit für die Ausführung des gesamten Vorgangs.
Scan-Zeit in ms Benötigte Zeit zum Überprüfen der Dateien auf Übereinstimmungen.
UmschreibzeitMs Benötigte Zeit zum erneuten Schreiben der übereinstimmenden Dateien.
UPDATE
Anzahl hinzugefügter Dateien Anzahl von hinzugefügten Dateien.
AnzahlGelöschterDateien Anzahl von entfernten Dateien.
numUpdatedRows Anzahl von aktualisierten Zeilen.
AnzahlKopierteZeilen Anzahl von Zeilen, die beim Aktualisieren von Dateien gerade kopiert wurden.
AusführungszeitMs Benötigte Zeit für die Ausführung des gesamten Vorgangs.
Scan-Zeit in ms Benötigte Zeit zum Überprüfen der Dateien auf Übereinstimmungen.
UmschreibzeitMs Benötigte Zeit zum erneuten Schreiben der übereinstimmenden Dateien.
FSCK AnzahlGelöschterDateien Anzahl von entfernten Dateien.
UMWANDELN AnzahlKonvertierterDateien Anzahl von Parquet-Dateien, die konvertiert wurden.
OPTIMIZE
Anzahl hinzugefügter Dateien Anzahl von hinzugefügten Dateien.
AnzahlGelöschterDateien Anzahl von optimierten Dateien.
numAddedBytes Anzahl von Bytes, die nach Optimierung der Tabelle hinzugefügt wurden.
AnzahlEntfernterBytes Anzahl von entfernten Bytes.
MinFileSize Größe der kleinsten Datei, nachdem die Tabelle optimiert wurde.
p25FileSize Größe der 25. Perzentildatei, nachdem die Tabelle optimiert wurde.
p50Dateigröße Mediandateigröße, nachdem die Tabelle optimiert wurde.
p75FileSize Größe der 75. Perzentildatei, nachdem die Tabelle optimiert wurde.
maxDateigröße Größe der größten Datei, nachdem die Tabelle optimiert wurde.
KLON
QuellTabellenGröße Größe in Byte der Quelltabelle in der Version, die geklont wird.
Anzahl der Quelldateien Die Anzahl der Dateien in der Quelltabelle in der Version, die geklont wird.
AnzahlGelöschterDateien Die Anzahl der Dateien, die aus der Zieltabelle entfernt wurden, wenn eine vorherige Tabelle ersetzt wurde.
entfernteDateienGröße Gesamtgröße in Byte der Dateien, die aus der Zieltabelle entfernt wurden, wenn eine vorherige Tabelle ersetzt wurde.
AnzahlKopierteDateien Anzahl von Dateien, die an den neuen Speicherort kopiert wurden. „0“ für flache Klone.
Größe der kopierten Dateien Gesamtgröße in Bytes der Dateien, die an den neuen Speicherort kopiert wurden. „0“ für flache Klone.
RESTORE
tableSizeAfterRestore Tabellengröße in Bytes nach der Wiederherstellung.
AnzahlDerDateienNachWiederherstellung Anzahl von Dateien in der Tabelle nach der Wiederherstellung.
AnzahlGelöschterDateien Anzahl von Dateien, die durch den Wiederherstellungsvorgang entfernt wurden.
AnzahlWiederhergestellterDateien Anzahl von Dateien, die als Ergebnis der Wiederherstellung hinzugefügt wurden.
entfernteDateienGröße Größe in Bytes von Dateien, die durch die Wiederherstellung entfernt wurden.
WiederhergestellteDateigröße Größe in Bytes von Dateien, die durch die Wiederherstellung hinzugefügt wurden.
VACUUM
AnzahlGelöschterDateien Anzahl von gelöschten Dateien.
AnzahlGeleerterVerzeichnisse Anzahl von bereinigten Verzeichnissen.
AnzahlDateienZuLöschen Anzahl von zu löschenden Dateien.

Was ist Zeitreise?

Die Zeitreise unterstützt das Abfragen früherer Tabellenversionen basierend auf der Zeitstempel- oder Tabellenversion (wie im Transaktionsprotokoll aufgezeichnet). Sie können Zeitreisen für Anwendungen wie die folgenden verwenden:

  • Erneutes Erstellen von Analysen, Berichten oder Ausgaben (z. B. der Ausgabe eines Machine Learning-Modells). Dies könnte für das Debuggen oder die Überwachung nützlich sein, insbesondere in regulierten Branchen.
  • Schreiben komplexer temporaler Abfragen.
  • Beheben von Fehlern in Ihren Daten.
  • Bereitstellen der Momentaufnahmenisolation für eine Reihe von Abfragen für sich schnell verändernde Tabellen.

Wichtig

In Databricks Runtime 18.0 und höher werden Zeitreiseabfragen blockiert, wenn sie eine Version anfordern, die älter als die deletedFileRetentionDuration Tabelleneigenschaft ist (Standard 7 Tage). Bei verwalteten Tabellen im Unity-Katalog gilt dies für Databricks Runtime 12.2 und höher.

Zeitreisesyntax

Sie fragen eine Tabelle mit Zeitreise ab, indem Sie eine Klausel nach der Tabellennamenspezifikation hinzufügen.

  • timestamp_expression kann einen der folgenden Werte annehmen:
    • '2018-10-18T22:15:12.013Z', d. h. eine Zeichenfolge, die in einen Zeitstempel umgewandelt werden kann
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', d. h. eine Datumszeichenfolge
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • Jeder andere Ausdruck, der ein Zeitstempel ist oder in einen Zeitstempel umgewandelt werden kann
  • version ist ein LONG-Wert, der aus der Ausgabe von DESCRIBE HISTORY table_spec abgerufen werden kann.

Weder timestamp_expression noch version können Unterabfragen sein.

Es werden nur Datums- oder Zeitstempelzeichenfolgen akzeptiert. Beispiel: "2019-01-01" und "2019-01-01T00:00:00.000Z". Eine Beispielsyntax finden Sie im folgenden Code:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")

Sie können auch die Syntax @ verwenden, um den Zeitstempel oder die Version als Teil des Tabellennamens anzugeben. Der Zeitstempel muss im Format yyyyMMddHHmmssSSS vorliegen. Sie können eine Version nach @ angeben, indem Sie der Version v voranstellen. Eine Beispielsyntax finden Sie im folgenden Code:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

Was sind Transaktionsprotokoll-Prüfpunkte?

Tabellenversionen werden als JSON-Dateien im Transaktionsprotokollverzeichnis aufgezeichnet, die zusammen mit Tabellendaten gespeichert werden. Um die Checkpoint-Abfrage zu optimieren, werden Tabellenversionen in Parquet-Prüfpunktdateien aggregiert, um zu vermeiden, dass alle JSON-Versionen des Tabellenverlaufs gelesen werden müssen. Azure Databricks optimiert die Prüfpunkthäufigkeit für Datengröße und Workload. Benutzer sollten nicht direkt mit Prüfpunkten interagieren müssen. Die Prüfpunkthäufigkeit kann sich ohne Ankündigung ändern.

Konfigurieren der Datenaufbewahrung für Zeitreiseabfragen

Um eine frühere Tabellenversion abzufragen, müssen Sie sowohl die Protokoll- als auch die Datendateien für diese Version beibehalten.

Datendateien werden gelöscht, wenn VACUUM gegen eine Tabelle ausgeführt wird. Das Entfernen von Protokolldateien wird nach dem Überprüfen der Tabellenversionen automatisch verwaltet.

Da auf die meisten Tabellen regelmäßig VACUUM ausgeführt wird, sollten Point-in-Time-Abfragen den Aufbewahrungsschwellenwert VACUUM einhalten, der standardmäßig 7 Tage beträgt.

Um den Schwellenwert für die Datenaufbewahrung für Tabellen zu erhöhen, müssen Sie die folgenden Tabelleneigenschaften konfigurieren:

  • delta.logRetentionDuration = "interval <interval>": Steuert, wie lange der Verlauf einer Tabelle aufbewahrt wird. Der Standardwert lautet interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>": bestimmt den Schwellenwert, den VACUUM verwendet, um Datendateien zu entfernen, auf die in der aktuellen Tabellenversion nicht mehr verwiesen wird. Der Standardwert lautet interval 7 days.

Sie können Tabelleneigenschaften während der Tabellenerstellung angeben oder mit einer ALTER TABLE Anweisung festlegen. Siehe Referenz zu Tabelleneigenschaften.

Hinweis

In Databricks Runtime 18.0 und höher muss logRetentionDuration größer oder gleich deletedFileRetentionDuration sein. Bei verwalteten Tabellen im Unity-Katalog gilt dies für Databricks Runtime 12.2 und höher.

Stellen Sie delta.deletedFileRetentionDuration = "interval 30 days" ein, um auf 30 Tage Verlaufsdaten zuzugreifen (welches mit der Standardeinstellung von delta.logRetentionDuration übereinstimmt).

Die Erhöhung des Schwellenwerts für die Datenaufbewahrung kann dazu führen, dass Ihre Speicherkosten steigen, da mehr Datendateien aufbewahrt werden.

Wiederherstellen einer Tabelle in einem früheren Zustand

Sie können eine Tabelle mithilfe des RESTORE Befehls in den früheren Zustand wiederherstellen. Tabellen verwalten intern historische Versionen, mit denen sie in einen früheren Zustand wiederhergestellt werden können. Eine Version, die dem früheren Zustand entspricht, oder ein Zeitstempel von der Erstellung des früheren Zustands werden durch den Befehl RESTORE als Optionen unterstützt.

Wichtig

  • Sie können eine bereits wiederhergestellte Tabelle wiederherstellen.
  • Sie können eine geklonte Tabelle wiederherstellen.
  • Sie müssen MODIFY-Berechtigung für die zurzeit wiederhergestellte Tabelle haben.
  • Sie können eine Tabelle nicht auf eine ältere Version wiederherstellen, wenn die Datendateien manuell oder durch vacuum gelöscht wurden. Die teilweise Wiederherstellung auf diese Version ist weiterhin möglich, wenn spark.sql.files.ignoreMissingFiles auf true festgelegt wird.
  • Das Zeitstempelformat für die Wiederherstellung auf einen früheren Zustand lautet yyyy-MM-dd HH:mm:ss. Die Angabe nur einer „date(yyyy-MM-dd)“-Zeichenfolge wird ebenfalls unterstützt.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;

Ausführliche Informationen zur Syntax finden Sie unter RESTORE.

Wichtig

Die Wiederherstellung wird als Datenänderungsvorgang betrachtet. Vom Befehl hinzugefügte RESTORE Protokolleinträge enthalten "dataChange", das auf "true" gesetzt ist. Wenn es eine nachgelagerte Anwendung gibt, z. B. einen strukturierten Streamingauftrag , der die Aktualisierungen einer Tabelle verarbeitet, werden die vom Wiederherstellungsvorgang hinzugefügten Datenänderungsprotokolleinträge als neue Datenaktualisierungen betrachtet, und die Verarbeitung kann zu doppelten Daten führen.

Beispiele:

Tabellenversion Vorgang Protokollaktualisierungen Datensätze in Protokollaktualisierungen zu Datenänderungen
0 INSERT AddFile(/Pfad/zur/Datei-1, dataChange = true) (Name = Viktor, Alter = 29, (Name = George, Alter = 55)
1 INSERT AddFile(/Pfad/zur/Datei-2, dataChange = true) (Name = George, Alter = 39)
2 OPTIMIZE AddFile(/Pfad/zur/Datei-3, dataChange = false), RemoveFile(/Pfad/zur/Datei-1), RemoveFile(/Pfad/zur/Datei-2) (Wenn bei der Optimierungskomprimierung keine Datensätze verarbeitet werden, erfolgen keine Änderungen an den Daten in der Tabelle.)
3 RESTORE(Version=1) RemoveFile(/Pfad/zur/Datei-3), AddFile(/Pfad/zur/Datei-1, dataChange = true), AddFile(/Pfad/zur/Datei-2, dataChange = true) (Name = Viktor, Alter = 29), (Name = George, Alter = 55), (Name = George, Alter = 39)

Im vorherigen Beispiel führt der RESTORE Befehl zu Updates, die beim Lesen der Tabellenversion 0 und 1 bereits angezeigt wurden. Wenn diese Tabelle bei einer Streamingabfrage gelesen wird, werden diese Dateien als neu hinzugefügte Daten betrachtet und erneut verarbeitet.

Wiederherstellen von Metriken

RESTORE meldet die folgenden Metriken als einzeiligen Datenrahmen (DataFrame), sobald der Vorgang abgeschlossen ist:

  • table_size_after_restore: Die Größe der Tabelle nach der Wiederherstellung.

  • num_of_files_after_restore Die Anzahl von Dateien in der Tabelle nach der Wiederherstellung.

  • num_removed_files: Die Anzahl von Dateien, die aus der Tabelle entfernt (logisch gelöscht) wurden.

  • num_restored_files: Die Anzahl von Dateien, die aufgrund eines Rollbacks wiederhergestellt wurden.

  • removed_files_size: Die Gesamtgröße in Bytes der aus der Tabelle entfernten Dateien.

  • restored_files_size: Die Gesamtgröße in Bytes der wiederhergestellten Dateien.

    Beispiel der Wiederherstellung von Metriken

Beispiele für die Verwendung von Zeitreisen

  • Korrigieren versehentlicher Löschungen in einer Tabelle für den Benutzer 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • Korrigieren versehentlicher falscher Updates für eine Tabelle:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • Abfragen der Anzahl von Neukunden, die in der letzten Woche gewonnen wurden.

    SELECT
    (
      SELECT count(distinct userId)
      FROM my_table
    )
    -
    (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7)
    ) AS new_customers
    

Wie finde ich die Version des letzten Commits in der Spark-Sitzung?

Um die Versionsnummer des letzten Commits zu erhalten, der von der aktuellen SparkSession für alle Threads und alle Tabellen geschrieben wurde, fragen Sie die SQL-Konfiguration spark.databricks.delta.lastCommitVersionInSession ab.

Hinweis

Verwenden Sie spark.databricks.iceberg.lastCommitVersionInSession für Apache Iceberg-Tabellen anstelle von spark.databricks.delta.lastCommitVersionInSession.

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Wenn von der SparkSession keine Commits durchgeführt wurden, wird beim Abfragen des Schlüssels ein leerer Wert zurückgegeben.

Hinweis

Wenn Sie dasselbe SparkSession über mehrere Threads hinweg teilen, ist es ähnlich, wie wenn Sie eine Variable über mehrere Threads hinweg teilen; Sie könnten auf Race-Bedingungen stoßen, da der Konfigurationswert parallel aktualisiert wird.