File metadata column

You can get metadata information for input files with the _metadata column. The _metadata column is a hidden column, and is available for all input file formats. To include the _metadata column in the returned DataFrame, you must explicitly select it in the read query where you specify the source.

If the data source contains a column named _metadata, queries return the column from the data source, and not the file metadata.

Warning

New fields might be added to the _metadata column in future releases. To prevent schema evolution errors if the _metadata column is updated, Databricks recommends selecting specific fields from the column in your queries. See examples.

Supported metadata

The _metadata column is a STRUCT containing the following fields:

Name Type Description Example Minimum Databricks Runtime release
file_path STRING File path of the input file. file:/tmp/f0.csv 10.5
file_name STRING Name of the input file along with its extension. f0.csv 10.5
file_size LONG Length of the input file, in bytes. 628 10.5
file_modification_time TIMESTAMP Last modification timestamp of the input file. 2021-12-20 20:05:21 10.5
file_block_start LONG Start offset of the block being read, in bytes. 0 13.0
file_block_length LONG Length of the block being read, in bytes. 628 13.0

Examples

Use in a basic file-based data source reader

Python

df = spark.read \
  .format("csv") \
  .schema(schema) \
  .load("dbfs:/tmp/*") \
  .select("*", "_metadata")

display(df)

'''
Result:
+---------+-----+----------------------------------------------------+
|   name  | age |                 _metadata                          |
+=========+=====+====================================================+
|         |     | {                                                  |
|         |     |    "file_path": "dbfs:/tmp/f0.csv",                |
| Debbie  | 18  |    "file_name": "f0.csv",                          |
|         |     |    "file_size": 12,                                |
|         |     |    "file_block_start": 0,                          |
|         |     |    "file_block_length": 12,                        |
|         |     |    "file_modification_time": "2021-07-02 01:05:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
|         |     | {                                                  |
|         |     |    "file_path": "dbfs:/tmp/f1.csv",                |
| Frank   | 24  |    "file_name": "f1.csv",                          |
|         |     |    "file_size": 12,                                |
|         |     |    "file_block_start": 0,                          |
|         |     |    "file_block_length": 12,                        |
|         |     |    "file_modification_time": "2021-12-20 02:06:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
'''

Scala

val df = spark.read
  .format("csv")
  .schema(schema)
  .load("dbfs:/tmp/*")
  .select("*", "_metadata")

display(df_population)

/* Result:
+---------+-----+----------------------------------------------------+
|   name  | age |                 _metadata                          |
+=========+=====+====================================================+
|         |     | {                                                  |
|         |     |    "file_path": "dbfs:/tmp/f0.csv",                |
| Debbie  | 18  |    "file_name": "f0.csv",                          |
|         |     |    "file_size": 12,                                |
|         |     |    "file_block_start": 0,                          |
|         |     |    "file_block_length": 12,                        |
|         |     |    "file_modification_time": "2021-07-02 01:05:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
|         |     | {                                                  |
|         |     |    "file_path": "dbfs:/tmp/f1.csv",                |
| Frank   | 24  |    "file_name": "f1.csv",                          |
|         |     |    "file_size": 10,                                |
|         |     |    "file_block_start": 0,                          |
|         |     |    "file_block_length": 12,                        |
|         |     |    "file_modification_time": "2021-12-20 02:06:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
*/

Select specific fields

Python

spark.read \
  .format("csv") \
  .schema(schema) \
  .load("dbfs:/tmp/*") \
  .select("_metadata.file_name", "_metadata.file_size")

Scala

spark.read
  .format("csv")
  .schema(schema)
  .load("dbfs:/tmp/*")
  .select("_metadata.file_name", "_metadata.file_size")

Use in filters

Python

spark.read \
  .format("csv") \
  .schema(schema) \
  .load("dbfs:/tmp/*") \
  .select("*") \
  .filter(col("_metadata.file_name") == lit("test.csv"))

Scala

spark.read
  .format("csv")
  .schema(schema)
  .load("dbfs:/tmp/*")
  .select("*")
  .filter(col("_metadata.file_name") === lit("test.csv"))

Use in COPY INTO (legacy)

COPY INTO my_delta_table
FROM (
  SELECT *, _metadata FROM 'abfss://my-container-name@storage-account-name.dfs.core.windows.net/csvData'
)
FILEFORMAT = CSV

Use in Auto Loader

If your source data contains a column named _metadata, rename it to source_metadata. If you don’t rename it, you can’t access the file metadata column in the target table; queries will return the source column instead.

Python

spark.readStream \
  .format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .schema(schema) \
  .load("abfss://my-container-name@storage-account-name.dfs.core.windows.net/csvData") \
  .selectExpr("*", "_metadata as source_metadata") \
  .writeStream \
  .option("checkpointLocation", checkpointLocation) \
  .start(targetTable)

Scala

spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .schema(schema)
  .load("abfss://my-container-name@storage-account-name.dfs.core.windows.net/csvData")
  .selectExpr("*", "_metadata as source_metadata")
  .writeStream
  .option("checkpointLocation", checkpointLocation)
  .start(targetTable)

If you use foreachBatch and want to include the file metadata column in the streaming DataFrame, you must reference it in the streaming read DataFrame before the foreachBatch function. If you only reference the file metadata column inside the foreachBatch function, the column is not included.

Python

spark.readStream \
  .format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .load("abfss://my-container-name@storage-account-name.dfs.core.windows.net/csvData") \
  .select("*", "metadata") \
  .writeStream \
  .foreachBatch(...)

Scala

spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .load("abfss://my-container-name@storage-account-name.dfs.core.windows.net/csvData")
  .select("*", "metadata")
  .writeStream
  .foreachBatch(...)