File metadata column
You can get metadata information for input files with the _metadata
column. The _metadata
column is a hidden column, and is available for all input file formats. To include the _metadata
column in the returned DataFrame, you must explicitly select it in the read query where you specify the source.
If the data source contains a column named _metadata
, queries return the column from the data source, and not the file metadata.
Warning
New fields might be added to the _metadata
column in future releases. To prevent schema evolution errors if the _metadata
column is updated, Databricks recommends selecting specific fields from the column in your queries. See examples.
Supported metadata
The _metadata
column is a STRUCT
containing the following fields:
Name | Type | Description | Example | Minimum Databricks Runtime release |
---|---|---|---|---|
file_path | STRING |
File path of the input file. | file:/tmp/f0.csv |
10.5 |
file_name | STRING |
Name of the input file along with its extension. | f0.csv |
10.5 |
file_size | LONG |
Length of the input file, in bytes. | 628 | 10.5 |
file_modification_time | TIMESTAMP |
Last modification timestamp of the input file. | 2021-12-20 20:05:21 |
10.5 |
file_block_start | LONG |
Start offset of the block being read, in bytes. | 0 | 13.0 |
file_block_length | LONG |
Length of the block being read, in bytes. | 628 | 13.0 |
Examples
Use in a basic file-based data source reader
Python
df = spark.read \
.format("csv") \
.schema(schema) \
.load("dbfs:/tmp/*") \
.select("*", "_metadata")
display(df)
'''
Result:
+---------+-----+----------------------------------------------------+
| name | age | _metadata |
+=========+=====+====================================================+
| | | { |
| | | "file_path": "dbfs:/tmp/f0.csv", |
| Debbie | 18 | "file_name": "f0.csv", |
| | | "file_size": 12, |
| | | "file_block_start": 0, |
| | | "file_block_length": 12, |
| | | "file_modification_time": "2021-07-02 01:05:21" |
| | | } |
+---------+-----+----------------------------------------------------+
| | | { |
| | | "file_path": "dbfs:/tmp/f1.csv", |
| Frank | 24 | "file_name": "f1.csv", |
| | | "file_size": 12, |
| | | "file_block_start": 0, |
| | | "file_block_length": 12, |
| | | "file_modification_time": "2021-12-20 02:06:21" |
| | | } |
+---------+-----+----------------------------------------------------+
'''
Scala
val df = spark.read
.format("csv")
.schema(schema)
.load("dbfs:/tmp/*")
.select("*", "_metadata")
display(df_population)
/* Result:
+---------+-----+----------------------------------------------------+
| name | age | _metadata |
+=========+=====+====================================================+
| | | { |
| | | "file_path": "dbfs:/tmp/f0.csv", |
| Debbie | 18 | "file_name": "f0.csv", |
| | | "file_size": 12, |
| | | "file_block_start": 0, |
| | | "file_block_length": 12, |
| | | "file_modification_time": "2021-07-02 01:05:21" |
| | | } |
+---------+-----+----------------------------------------------------+
| | | { |
| | | "file_path": "dbfs:/tmp/f1.csv", |
| Frank | 24 | "file_name": "f1.csv", |
| | | "file_size": 10, |
| | | "file_block_start": 0, |
| | | "file_block_length": 12, |
| | | "file_modification_time": "2021-12-20 02:06:21" |
| | | } |
+---------+-----+----------------------------------------------------+
*/
Select specific fields
Python
spark.read \
.format("csv") \
.schema(schema) \
.load("dbfs:/tmp/*") \
.select("_metadata.file_name", "_metadata.file_size")
Scala
spark.read
.format("csv")
.schema(schema)
.load("dbfs:/tmp/*")
.select("_metadata.file_name", "_metadata.file_size")
Use in filters
Python
spark.read \
.format("csv") \
.schema(schema) \
.load("dbfs:/tmp/*") \
.select("*") \
.filter(col("_metadata.file_name") == lit("test.csv"))
Scala
spark.read
.format("csv")
.schema(schema)
.load("dbfs:/tmp/*")
.select("*")
.filter(col("_metadata.file_name") === lit("test.csv"))
Use in COPY INTO (legacy)
COPY INTO my_delta_table
FROM (
SELECT *, _metadata FROM 'abfss://my-container-name@storage-account-name.dfs.core.windows.net/csvData'
)
FILEFORMAT = CSV
Use in Auto Loader
If your source data contains a column named _metadata
, rename it to source_metadata
. If you don’t rename it, you can’t access the file metadata column in the target table; queries will return the source column instead.
Python
spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.schema(schema) \
.load("abfss://my-container-name@storage-account-name.dfs.core.windows.net/csvData") \
.selectExpr("*", "_metadata as source_metadata") \
.writeStream \
.option("checkpointLocation", checkpointLocation) \
.start(targetTable)
Scala
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.schema(schema)
.load("abfss://my-container-name@storage-account-name.dfs.core.windows.net/csvData")
.selectExpr("*", "_metadata as source_metadata")
.writeStream
.option("checkpointLocation", checkpointLocation)
.start(targetTable)
If you use foreachBatch and want to include the file metadata column in the streaming DataFrame, you must reference it in the streaming read DataFrame before the foreachBatch
function. If you only reference the file metadata column inside the foreachBatch
function, the column is not included.
Python
spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.load("abfss://my-container-name@storage-account-name.dfs.core.windows.net/csvData") \
.select("*", "metadata") \
.writeStream \
.foreachBatch(...)
Scala
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("abfss://my-container-name@storage-account-name.dfs.core.windows.net/csvData")
.select("*", "metadata")
.writeStream
.foreachBatch(...)