Κοινή χρήση μέσω


Configure schema inference and evolution in Auto Loader

You can configure Auto Loader to automatically detect the schema of loaded data, allowing you to initialize tables without explicitly declaring the data schema and evolve the table schema as new columns are introduced. This eliminates the need to manually track and apply schema changes over time.

Auto Loader can also “rescue” data that was unexpected (for example, of differing data types) in a JSON blob column, that you can choose to access later using the semi-structured data access APIs.

The following formats are supported for schema inference and evolution:

File format Supported versions
JSON All versions
CSV All versions
XML Databricks Runtime 14.3 LTS and above
Avro Databricks Runtime 10.4 LTS and above
Parquet Databricks Runtime 11.3 LTS and above
ORC Unsupported
Text Not applicable (fixed-schema)
Binaryfile Not applicable (fixed-schema)

Syntax for schema inference and evolution

Specifying a target directory for the option cloudFiles.schemaLocation enables schema inference and evolution. You can choose to use the same directory you specify for the checkpointLocation. If you use Delta Live Tables, Azure Databricks manages schema location and other checkpoint information automatically.

Note

If you have more than one source data location being loaded into the target table, each Auto Loader ingestion workload requires a separate streaming checkpoint.

The following example uses parquet for the cloudFiles.format. Use csv, avro, or json for other file sources. All other settings for read and write stay the same for the default behaviors for each format.

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

How does Auto Loader schema inference work?

To infer the schema when first reading data, Auto Loader samples the first 50 GB or 1000 files that it discovers, whichever limit is crossed first. Auto Loader stores the schema information in a directory _schemas at the configured cloudFiles.schemaLocation to track schema changes to the input data over time.

Note

To change the size of the sample that’s used you can set the SQL configurations:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(byte string, for example 10gb)

and

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(integer)

By default, Auto Loader schema inference seeks to avoid schema evolution issues due to type mismatches. For formats that don’t encode data types (JSON, CSV, and XML), Auto Loader infers all columns as strings (including nested fields in JSON files). For formats with typed schema (Parquet and Avro), Auto Loader samples a subset of files and merges the schemas of individual files. This behavior is summarized in the following table:

File format Default inferred data type
JSON String
CSV String
XML String
Avro Types encoded in Avro schema
Parquet Types encoded in Parquet schema

The Apache Spark DataFrameReader uses different behavior for schema inference, selecting data types for columns in JSON, CSV, and XML sources based on sample data. To enable this behavior with Auto Loader, set the option cloudFiles.inferColumnTypes to true.

Note

When inferring the schema for CSV data, Auto Loader assumes that the files contain headers. If your CSV files do not contain headers, provide the option .option("header", "false"). In addition, Auto Loader merges the schemas of all the files in the sample to come up with a global schema. Auto Loader can then read each file according to its header and parse the CSV correctly.

Note

When a column has different data types in two Parquet files, Auto Loader chooses the widest type. You can use schemaHints to override this choice. When you specify schema hints, Auto Loader doesn’t cast the column to the specified type, but rather tells the Parquet reader to read the column as the specified type. In the case of a mismatch, the column is rescued in the rescued data column.

How does Auto Loader schema evolution work?

Auto Loader detects the addition of new columns as it processes your data. When Auto Loader detects a new column, the stream stops with an UnknownFieldException. Before your stream throws this error, Auto Loader performs schema inference on the latest micro-batch of data and updates the schema location with the latest schema by merging new columns to the end of the schema. The data types of existing columns remain unchanged.

Databricks recommends configuring Auto Loader streams with Databricks Jobs to restart automatically after such schema changes.

Auto Loader supports the following modes for schema evolution, which you set in the option cloudFiles.schemaEvolutionMode:

Mode Behavior on reading new column
addNewColumns (default) Stream fails. New columns are added to the schema. Existing columns do not evolve data types.
rescue Schema is never evolved and stream does not fail due to schema changes. All new columns are recorded in the rescued data column.
failOnNewColumns Stream fails. Stream does not restart unless the provided schema is updated, or the offending data file is removed.
none Does not evolve the schema, new columns are ignored, and data is not rescued unless the rescuedDataColumn option is set. Stream does not fail due to schema changes.

Note

addNewColumns mode is the default when a schema is not provided, but none is the default when you provide a schema. addNewColumns is not allowed when the schema of the stream is provided, but does work if you provide your schema as a schema hint.

How do partitions work with Auto Loader?

Auto Loader attempts to infer partition columns from the underlying directory structure of the data if the data is laid out in Hive style partitioning. For example, the file path base_path/event=click/date=2021-04-01/f0.json results in the inference of date and event as partition columns. If the underlying directory structure contains conflicting Hive partitions or doesn’t contain Hive style partitioning, partition columns are ignored.

Binary file (binaryFile) and text file formats have fixed data schemas, but support partition column inference. Databricks recommends setting cloudFiles.schemaLocation for these file formats. This avoids any potential errors or information loss and prevents inference of partitions columns each time an Auto Loader begins.

Partition columns are not considered for schema evolution. If you had an initial directory structure like base_path/event=click/date=2021-04-01/f0.json, and then start receiving new files as base_path/event=click/date=2021-04-01/hour=01/f1.json, Auto Loader ignores the hour column. To capture information for new partition columns, set cloudFiles.partitionColumns to event,date,hour.

Note

The option cloudFiles.partitionColumns takes a comma-separated list of column names. Only columns that exist as key=value pairs in your directory structure are parsed.

What is the rescued data column?

When Auto Loader infers the schema, a rescued data column is automatically added to your schema as _rescued_data. You can rename the column or include it in cases where you provide a schema by setting the option rescuedDataColumn.

The rescued data column ensures that columns that don’t match with the schema are rescued instead of being dropped. The rescued data column contains any data that isn’t parsed for the following reasons:

  • The column is missing from the schema.
  • Type mismatches.
  • Case mismatches.

The rescued data column contains a JSON containing the rescued columns and the source file path of the record.

Note

The JSON and CSV parsers support three modes when parsing records: PERMISSIVE, DROPMALFORMED, and FAILFAST. When used together with rescuedDataColumn, data type mismatches do not cause records to be dropped in DROPMALFORMED mode or throw an error in FAILFAST mode. Only corrupt records are dropped or throw errors, such as incomplete or malformed JSON or CSV. If you use badRecordsPath when parsing JSON or CSV, data type mismatches are not considered as bad records when using the rescuedDataColumn. Only incomplete and malformed JSON or CSV records are stored in badRecordsPath.

Change case-sensitive behavior

Unless case sensitivity is enabled, the columns abc, Abc, and ABC are considered the same column for the purposes of schema inference. The case that is chosen is arbitrary and depends on the sampled data. You can use schema hints to enforce which case should be used. Once a selection has been made and the schema is inferred, Auto Loader does not consider the casing variants that were not selected consistent with the schema.

When rescued data column is enabled, fields named in a case other than that of the schema are loaded to the _rescued_data column. Change this behavior by setting the option readerCaseSensitive to false, in which case Auto Loader reads data in a case-insensitive way.

Override schema inference with schema hints

You can use schema hints to enforce the schema information that you know and expect on an inferred schema. When you know that a column is of a specific data type, or if you want to choose a more general data type (for example, a double instead of an integer), you can provide an arbitrary number of hints for column data types as a string using SQL schema specification syntax, such as the following:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

See the documentation on data types for the list of supported data types.

If a column is not present at the start of the stream, you can also use schema hints to add that column to the inferred schema.

Here is an example of an inferred schema to see the behavior with schema hints.

Inferred schema:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

By specifying the following schema hints:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

you get:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Note

Array and Map schema hints support is available in Databricks Runtime 9.1 LTS and above.

Here is an example of an inferred schema with complex datatypes to see the behavior with schema hints.

Inferred schema:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

By specifying the following schema hints:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

you get:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Note

Schema hints are used only if you do not provide a schema to Auto Loader. You can use schema hints whether cloudFiles.inferColumnTypes is enabled or disabled.