Auto Loader で schema の推論と進化を設定する
読み込まれたデータの schema を自動的に検出するように自動ローダーを構成できます。これにより、データ schema を明示的に宣言せずに tables を初期化し、新しい columns が導入されると tableschema を進化させることができます。 これにより、時間の経過に伴う変更 schema 手動で追跡して適用する必要がなくなります。
自動ローダーでは、JSON BLOB
schema 推論と進化では、次の形式がサポートされています。
ファイル形式 | サポートされているバージョン |
---|---|
JSON |
すべてのバージョン |
CSV |
すべてのバージョン |
XML |
Databricks Runtime 14.3 LTS 以降 |
Avro |
Databricks Runtime 10.4 LTS 以降 |
Parquet |
Databricks Runtime 11.3 LTS 以降 |
ORC |
サポートされていない |
Text |
適用できません (固定schema) |
Binaryfile |
適用できません (固定schema) |
schema 推論と進化の構文
オプション cloudFiles.schemaLocation
のターゲット ディレクトリを指定すると、schema 推論と進化が可能になります。 checkpointLocation
に指定したのと同じディレクトリを使用することを選択できます。 Delta Live
注意
ターゲット tableに複数のソース データの場所が読み込まれている場合、各自動ローダー インジェスト ワークロードには個別のストリーミング チェックポイントが必要です。
次の例では、parquet
に cloudFiles.format
を使用します。 他のファイル ソースには、csv
、avro
、または json
を使用します。 読み取りと書き込みの他のすべての設定の既定の動作は、各形式で同じままです。
Python
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
自動ローダー schema推論のしくみ
自動ローダーは、初めてデータを読み込んだときにschemaを推論するため、検出された最初の 50 GB と 1000 ファイルのうち、先にlimitを超えた方をサンプリングします。 自動ローダーは、schema 情報を構成された cloudFiles.schemaLocation
に _schemas
ディレクトリに格納し、時間の経過と同時に入力データ schema 変更を追跡します。
注意
使用するサンプルのサイズを変更するには、SQL 構成を set します。
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
(バイト文字列 (例: 10gb
))
and
spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
(整数)
既定では、自動ローダー schema の推論は、型の不一致による schema の進化の問題を回避することを目指します。 データ型 (JSON、CSV、XML) をエンコードしない形式の場合、自動ローダーはすべての columns を文字列 (JSON ファイル内の入れ子になったフィールドを含む) として推論します。 型指定された schema (Parquet および Avro) の形式の場合、自動ローダーはファイルのサブセットをサンプリングし、個々のファイルのスキーマをマージします。 この動作は、次の tableにまとめられています。
ファイル形式 | 既定の推論されたデータ型 |
---|---|
JSON |
String |
CSV |
String |
XML |
String |
Avro |
Avro schema でエンコードされた型 |
Parquet |
Parquet schemaでエンコードされた型 |
Apache Spark DataFrameReader では、schema 推論にさまざまな動作が使用され、サンプル データに基づいて JSON、CSV、および XML ソースの columns のデータ型が選択されます。 自動ローダーでこの動作を有効にするには、オプション cloudFiles.inferColumnTypes
を true
に set します。
注意
CSV データの schema を推論する場合、自動ローダーでは、ファイルにヘッダーが含まれていると見なされます。 CSV ファイルにヘッダーが含まれていない場合は、オプション .option("header", "false")
を指定します。 さらに、自動ローダーによって、サンプル内のすべてのファイルのスキーマがマージされ、グローバル schemaが作成されます。 自動ローダーは、ヘッダーに従って各ファイルを読み取り、CSV を正しく解析できます。
注意
2 つの Parquet ファイルで column のデータ型が異なる場合、自動ローダーは最も広い種類を選択します。 schemaHints を使用して、この選択をオーバーライドできます。 schema ヒントを指定すると、自動ローダーは column を指定した型にキャストするのではなく、Parquet リーダーに指定した型として column を読み取るように指示します。 不一致が発生した場合、column は で救助されたデータ columnで救出されます。
自動ローダー schema の進化のしくみ
自動ローダーは、データを処理する際に新しい columns が追加されることを検出します。 自動ローダーが新しい columnを検出すると、ストリームは UnknownFieldException
で停止します。 ストリームがこのエラーをスローする前に、自動ローダーは、データの最新のマイクロバッチに対して schema 推論を実行し、新しい columns を schemaの末尾にマージすることで、schema の場所を最新の schema に更新します。 既存の columns のデータ型は変更されません。
Databricks では、Databricks ジョブ を使用して自動ローダー ストリームを構成し、このような schema の変更後に自動的に再起動することをお勧めします。
自動ローダーでは、schemaの展開について次のモードがサポートされています。これは、cloudFiles.schemaEvolutionMode
オプションでsetします。
モード | 新しい column を読み取る場合の動作 |
---|---|
addNewColumns (既定値) |
ストリームが失敗します。 新しい columns が schemaに追加されます。 既存の columns では、データ型は進化しません。 |
rescue |
Schema は進化せず、schema の変更によってストリームが失敗することはありません。 すべての新しい columns は、復旧されたデータ columnに記録されます。 |
failOnNewColumns |
ストリームが失敗します。 指定された schema が更新されるか、問題のあるデータ ファイルが削除されない限り、ストリームは再起動しません。 |
none |
schemaを進化させず、新しい columns は無視され、rescuedDataColumn オプションが setされない限り、データは救助されません。 schemaの変更によりストリームが失敗することはありません。 |
注意
addNewColumns
モードは、schema が指定されていない場合の既定値ですが、schemaを指定する場合は none
が既定値です。 ストリームの schema が提供されている場合、addNewColumns
は許可されませんが、schema を schema ヒントとして指定場合は機能します。
自動ローダー使用時のパーティションの動作
自動ローダーは、データが Hive スタイルのパーティション分割でレイアウトされている場合、データの基になるディレクトリ構造から partitioncolumns を推測しようとします。 たとえば、ファイル パス base_path/event=click/date=2021-04-01/f0.json
は、partitioncolumnsとして date
と event
の推論になります。 基になるディレクトリ構造に競合する Hive パーティションが含まれている場合、または Hive スタイルのパーティション分割が含まれていない場合、partitioncolumns は無視されます。
バイナリ ファイル (binaryFile
) 形式と text
ファイル形式には固定データ スキーマがありますが、partitioncolumn 推論がサポートされています。 Databricks では、これらのファイル形式に対して cloudFiles.schemaLocation
設定をお勧めします。 これにより、潜在的なエラーや情報の損失を回避し、自動ローダーが開始されるたびにパーティション columns の推論を防ぐことができます。
Partitioncolumns は、schema 進化には考慮されません。 base_path/event=click/date=2021-04-01/f0.json
のような初期ディレクトリ構造があり、base_path/event=click/date=2021-04-01/hour=01/f1.json
として新しいファイルの受信を開始した場合、自動ローダーは時間 columnを無視します。 新しいpartitioncolumnsの情報を取得するには、cloudFiles.partitionColumns
を event,date,hour
にsetします。
注意
cloudFiles.partitionColumns
オプションは、column名のコンマ区切りlistを受け取ります。 ディレクトリ構造に key=value
ペアとして存在する columns のみが解析されます。
救助されたデータ columnは何ですか?
自動ローダーが schemaを推測すると、救助されたデータ column が自動的に _rescued_data
として schema に追加されます。 オプション rescuedDataColumn
を設定すると、columnの名前を変更したり、schemaを提供する場合にそれを含めたりできます。where
復旧されたデータ column により、schema と一致しない columns が削除されるのではなく、確実に救助されます。 復旧されたデータ column には、次の理由で解析されていないデータが含まれています。
- schemaに column がありません。
- 型が一致しない。
- 大文字と小文字が一致しない。
復旧されたデータ column には、復旧された columns とレコードのソース ファイル パスを含む JSON が含まれています。
注意
JSON および CSV パーサーでは、レコードを解析するときに、PERMISSIVE
、DROPMALFORMED
、FAILFAST
の 3 つのモードがサポートされます。 rescuedDataColumn
と共に使用すると、データ型の不一致によって、DROPMALFORMED
モードでレコードが削除されたり、FAILFAST
モードでエラーがスローされたりすることはありません。 不完全であるか形式に誤りがある JSON や CSV などの、破損したレコードのみが削除されたり、エラーをスローしたりします。 JSON または CSV を解析するときに badRecordsPath
を使用する場合、rescuedDataColumn
を使用すると、データ型の不一致は無効なレコードとは見なされません。 badRecordsPath
には、不完全で形式に誤りがある JSON または CSV レコードだけが格納されます。
大文字と小文字が区別される動作を変更する
大文字と小文字の区別が有効になっていない限り、columnsabc
、Abc
、および ABC
は、schema の推論の目的で同じ column と見なされます。 大文字または小文字の選択は任意であり、サンプリングされたデータによって異なります。 schema ヒント を使用して、どのケースを使用するかを強制できます。 選択が行われてschemaが推論されると、自動ローダーでは、大文字と小文字の区別で選ばれなかった方は、schemaと一致するとは見なされなくなります。
復旧されたデータ readerCaseSensitive
を false に設定します。これにより、自動ローダーは大文字と小文字を区別せずにデータを読み取ります。
schema 推論を schema ヒントでオーバーライドする
schema ヒントを使うと、わかっていて望ましいschema情報を、推論されたschemaに適用できます。 column が特定のデータ型であることがわかっている場合、またはより一般的なデータ型 (たとえば、integer
ではなく double
) を選択する場合は、SQL schema 仕様構文を使用して、column データ型の任意の数のヒントを文字列として指定できます。次に例を示します。
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
サポートされているデータ型の list については、データ型の に関するドキュメントを参照してください。
ストリームの開始時に column が存在しない場合は、schema ヒントを使用して、推論された schemaにその column を追加することもできます。
schema ヒントを使用して動作を確認するために推論された schema の例を次に示します。
推論されたschema:
|-- date: string
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string
|-- purchase_options: struct
| |-- delivery_address: string
次のschema ヒントを指定します。
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
getされるもの:
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp
注意
配列とマップの schema ヒントのサポートは、Databricks Runtime 9.1 LTS 以降で利用できます。
ここでは、複雑なデータ型を使用して推論された schema の例を示し、schema ヒントを使用して動作を確認します。
推論されたschema:
|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int
次のschema ヒントを指定します。
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
getされるもの:
|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string -> int
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string -> int
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int -> string
注意
自動ローダーに schema を 提供しない 場合にのみ、Schema ヒントが使用されます。 cloudFiles.inferColumnTypes
が有効か無効かに関係なく、schema ヒントを使用できます。