Spalte mit Dateimetadaten

Sie können Metadateninformationen für Eingabedateien mit der Spalte _metadata abrufen. Die Spalte _metadata ist eine versteckte Spalte und steht für alle Eingabedateiformate zur Verfügung. Um die _metadata Spalte in den zurückgegebenen DataFrame einzuschließen, müssen Sie sie explizit in der Leseabfrage auswählen, in der Sie die Quelle angeben.

Wenn die Datenquelle eine Spalte mit dem Namen _metadata enthält, geben Abfragen die Spalte aus der Datenquelle zurück, und nicht die Dateimetadaten.

Warnung

Neue Felder können der Spalte _metadata in zukünftigen Releases hinzugefügt werden. Um Fehler bei der Schemaentwicklung zu verhindern, wenn die Spalte _metadata aktualisiert wird, empfiehlt Databricks, bestimmte Felder der Spalte in Ihren Abfragen auszuwählen. Weitere Informationen finden Sie unter Beispiele.

Unterstützte Metadaten

Die Spalte _metadata ist eine STRUCT mit den folgenden Feldern:

Name Typ Beschreibung Beispiel Mindestversion von Databricks Runtime
file_path STRING Dateipfad zur Eingabedatei file:/tmp/f0.csv 10,5
file_name STRING Name der Eingabedatei zusammen mit der Erweiterung f0.csv 10,5
file_size LONG Länge der Eingabedatei in Byte 628 10,5
Dateiänderungszeit TIMESTAMP Zeitstempel der letzten Änderung der Eingabedatei 2021-12-20 20:05:21 10,5
file_block_start LONG Startoffset des gelesenen Blocks in Bytes. 0 13,0
Dateiblocklänge LONG Länge des gelesenen Blocks in Bytes. 628 13,0

Informationen zum Abrufen zusätzlicher Eigenschaften auf Cloudobjektebene finden Sie in der Spalte "Objektmetadaten".

Beispiele

Verwenden in einem einfachen dateibasierten Datenquellenleser

Python

df = spark.read \
  .format("csv") \
  .schema(schema) \
  .load("/Volumes/catalog_name/schema_name/volume_name/data/*") \
  .select("*", "_metadata")

display(df)

'''
Result:
+---------+-----+----------------------------------------------------+
|   name  | age |                 _metadata                          |
+=========+=====+====================================================+
|         |     | {                                                  |
|         |     |    "file_path": "/Volumes/catalog_name/            |
| Debbie  | 18  |      schema_name/volume_name/data/f0.csv",         |
|         |     |    "file_name": "f0.csv",                          |
|         |     |    "file_size": 12,                                |
|         |     |    "file_block_start": 0,                          |
|         |     |    "file_block_length": 12,                        |
|         |     |    "file_modification_time": "2021-07-02 01:05:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
|         |     | {                                                  |
|         |     |    "file_path": "/Volumes/catalog_name/            |
| Frank   | 24  |      schema_name/volume_name/data/f1.csv",         |
|         |     |    "file_name": "f1.csv",                          |
|         |     |    "file_size": 12,                                |
|         |     |    "file_block_start": 0,                          |
|         |     |    "file_block_length": 12,                        |
|         |     |    "file_modification_time": "2021-12-20 02:06:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
'''

Scala

val df = spark.read
  .format("csv")
  .schema(schema)
  .load("/Volumes/catalog_name/schema_name/volume_name/data/*")
  .select("*", "_metadata")

display(df)

/* Result:
+---------+-----+----------------------------------------------------+
|   name  | age |                 _metadata                          |
+=========+=====+====================================================+
|         |     | {                                                  |
|         |     |    "file_path": "/Volumes/catalog_name/            |
| Debbie  | 18  |      schema_name/volume_name/data/f0.csv",         |
|         |     |    "file_name": "f0.csv",                          |
|         |     |    "file_size": 12,                                |
|         |     |    "file_block_start": 0,                          |
|         |     |    "file_block_length": 12,                        |
|         |     |    "file_modification_time": "2021-07-02 01:05:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
|         |     | {                                                  |
|         |     |    "file_path": "/Volumes/catalog_name/            |
| Frank   | 24  |      schema_name/volume_name/data/f1.csv",         |
|         |     |    "file_name": "f1.csv",                          |
|         |     |    "file_size": 10,                                |
|         |     |    "file_block_start": 0,                          |
|         |     |    "file_block_length": 12,                        |
|         |     |    "file_modification_time": "2021-12-20 02:06:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
*/

Auswählen bestimmter Felder

Python

spark.read \
  .format("csv") \
  .schema(schema) \
  .load("/Volumes/catalog_name/schema_name/volume_name/data/*") \
  .select("_metadata.file_name", "_metadata.file_size")

Scala

spark.read
  .format("csv")
  .schema(schema)
  .load("/Volumes/catalog_name/schema_name/volume_name/data/*")
  .select("_metadata.file_name", "_metadata.file_size")

Verwenden in Filtern

Python

spark.read \
  .format("csv") \
  .schema(schema) \
  .load("/Volumes/catalog_name/schema_name/volume_name/data/*") \
  .select("*") \
  .filter(col("_metadata.file_name") == lit("test.csv"))

Scala

spark.read
  .format("csv")
  .schema(schema)
  .load("/Volumes/catalog_name/schema_name/volume_name/data/*")
  .select("*")
  .filter(col("_metadata.file_name") === lit("test.csv"))

Verwendung in COPY INTO (veraltet)

COPY INTO my_delta_table
FROM (
  SELECT *, _metadata FROM 'abfss://my-container-name@storage-account-name.dfs.core.windows.net/csvData'
)
FILEFORMAT = CSV

Verwenden im Autoloader

Wenn die Quelldaten eine Spalte mit dem Namen _metadata enthalten, benennen Sie sie in source_metadata um. Wenn Sie sie nicht umbenennen, können Sie nicht auf die Dateimetadatenspalte in der Zieltabelle zugreifen. Abfragen geben stattdessen die Quellspalte zurück.

Python

spark.readStream \
  .format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .schema(schema) \
  .load("abfss://my-container-name@storage-account-name.dfs.core.windows.net/csvData") \
  .selectExpr("*", "_metadata as source_metadata") \
  .writeStream \
  .option("checkpointLocation", checkpointLocation) \
  .start(targetTable)

Scala

spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .schema(schema)
  .load("abfss://my-container-name@storage-account-name.dfs.core.windows.net/csvData")
  .selectExpr("*", "_metadata as source_metadata")
  .writeStream
  .option("checkpointLocation", checkpointLocation)
  .start(targetTable)

Wenn Sie foreachBatch verwenden und die Dateimetadatenspalte in das Streaming DataFrame aufnehmen möchten, müssen Sie im Streaming-Lesedatenframe vor der foreachBatch Funktion darauf verweisen. Wenn Sie nur auf die Dateimetadatenspalte innerhalb der foreachBatch Funktion verweisen, ist die Spalte nicht enthalten.

Python

spark.readStream \
  .format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .load("abfss://my-container-name@storage-account-name.dfs.core.windows.net/csvData") \
  .select("*", "metadata") \
  .writeStream \
  .foreachBatch(...)

Scala

spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .load("abfss://my-container-name@storage-account-name.dfs.core.windows.net/csvData")
  .select("*", "metadata")
  .writeStream
  .foreachBatch(...)