次の方法で共有


Auto Loader で schema の推論と進化を設定する

読み込まれたデータの schema を自動的に検出するように自動ローダーを構成できます。これにより、データ schema を明示的に宣言せずに tables を初期化し、新しい columns が導入されると tableschema を進化させることができます。 これにより、時間の経過に伴う変更 schema 手動で追跡して適用する必要がなくなります。

自動ローダーでは、JSON BLOB で予期しない (たとえば、異なるデータ型の) データを "復旧" することもできます。これは、の半構造化データ アクセス API を使用して後でアクセスすることを選択できます。

schema 推論と進化では、次の形式がサポートされています。

ファイル形式 サポートされているバージョン
JSON すべてのバージョン
CSV すべてのバージョン
XML Databricks Runtime 14.3 LTS 以降
Avro Databricks Runtime 10.4 LTS 以降
Parquet Databricks Runtime 11.3 LTS 以降
ORC サポートされていない
Text 適用できません (固定schema)
Binaryfile 適用できません (固定schema)

schema 推論と進化の構文

オプション cloudFiles.schemaLocation のターゲット ディレクトリを指定すると、schema 推論と進化が可能になります。 checkpointLocation に指定したのと同じディレクトリを使用することを選択できます。 Delta Live 使用すると、Azure Databricks によって の場所やその他のチェックポイント情報が自動的に管理されます。

注意

ターゲット tableに複数のソース データの場所が読み込まれている場合、各自動ローダー インジェスト ワークロードには個別のストリーミング チェックポイントが必要です。

次の例では、parquetcloudFiles.format を使用します。 他のファイル ソースには、csvavro、または json を使用します。 読み取りと書き込みの他のすべての設定の既定の動作は、各形式で同じままです。

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

自動ローダー schema推論のしくみ

自動ローダーは、初めてデータを読み込んだときにschemaを推論するため、検出された最初の 50 GB と 1000 ファイルのうち、先にlimitを超えた方をサンプリングします。 自動ローダーは、schema 情報を構成された cloudFiles.schemaLocation_schemas ディレクトリに格納し、時間の経過と同時に入力データ schema 変更を追跡します。

注意

使用するサンプルのサイズを変更するには、SQL 構成を set します。

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(バイト文字列 (例: 10gb))

and

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(整数)

既定では、自動ローダー schema の推論は、型の不一致による schema の進化の問題を回避することを目指します。 データ型 (JSON、CSV、XML) をエンコードしない形式の場合、自動ローダーはすべての columns を文字列 (JSON ファイル内の入れ子になったフィールドを含む) として推論します。 型指定された schema (Parquet および Avro) の形式の場合、自動ローダーはファイルのサブセットをサンプリングし、個々のファイルのスキーマをマージします。 この動作は、次の tableにまとめられています。

ファイル形式 既定の推論されたデータ型
JSON String
CSV String
XML String
Avro Avro schema でエンコードされた型
Parquet Parquet schemaでエンコードされた型

Apache Spark DataFrameReader では、schema 推論にさまざまな動作が使用され、サンプル データに基づいて JSON、CSV、および XML ソースの columns のデータ型が選択されます。 自動ローダーでこの動作を有効にするには、オプション cloudFiles.inferColumnTypestrueに set します。

注意

CSV データの schema を推論する場合、自動ローダーでは、ファイルにヘッダーが含まれていると見なされます。 CSV ファイルにヘッダーが含まれていない場合は、オプション .option("header", "false") を指定します。 さらに、自動ローダーによって、サンプル内のすべてのファイルのスキーマがマージされ、グローバル schemaが作成されます。 自動ローダーは、ヘッダーに従って各ファイルを読み取り、CSV を正しく解析できます。

注意

2 つの Parquet ファイルで column のデータ型が異なる場合、自動ローダーは最も広い種類を選択します。 schemaHints を使用して、この選択をオーバーライドできます。 schema ヒントを指定すると、自動ローダーは column を指定した型にキャストするのではなく、Parquet リーダーに指定した型として column を読み取るように指示します。 不一致が発生した場合、column は で救助されたデータ columnで救出されます。

自動ローダー schema の進化のしくみ

自動ローダーは、データを処理する際に新しい columns が追加されることを検出します。 自動ローダーが新しい columnを検出すると、ストリームは UnknownFieldExceptionで停止します。 ストリームがこのエラーをスローする前に、自動ローダーは、データの最新のマイクロバッチに対して schema 推論を実行し、新しい columns を schemaの末尾にマージすることで、schema の場所を最新の schema に更新します。 既存の columns のデータ型は変更されません。

Databricks では、Databricks ジョブ を使用して自動ローダー ストリームを構成し、このような schema の変更後に自動的に再起動することをお勧めします。

自動ローダーでは、schemaの展開について次のモードがサポートされています。これは、cloudFiles.schemaEvolutionMode オプションでsetします。

モード 新しい column を読み取る場合の動作
addNewColumns (既定値) ストリームが失敗します。 新しい columns が schemaに追加されます。 既存の columns では、データ型は進化しません。
rescue Schema は進化せず、schema の変更によってストリームが失敗することはありません。 すべての新しい columns は、復旧されたデータ columnに記録されます。
failOnNewColumns ストリームが失敗します。 指定された schema が更新されるか、問題のあるデータ ファイルが削除されない限り、ストリームは再起動しません。
none schemaを進化させず、新しい columns は無視され、rescuedDataColumn オプションが setされない限り、データは救助されません。 schemaの変更によりストリームが失敗することはありません。

注意

addNewColumns モードは、schema が指定されていない場合の既定値ですが、schemaを指定する場合は none が既定値です。 ストリームの schema が提供されている場合、addNewColumns は許可されませんが、schema を schema ヒントとして指定場合は機能します。

自動ローダー使用時のパーティションの動作

自動ローダーは、データが Hive スタイルのパーティション分割でレイアウトされている場合、データの基になるディレクトリ構造から partitioncolumns を推測しようとします。 たとえば、ファイル パス base_path/event=click/date=2021-04-01/f0.json は、partitioncolumnsとして dateevent の推論になります。 基になるディレクトリ構造に競合する Hive パーティションが含まれている場合、または Hive スタイルのパーティション分割が含まれていない場合、partitioncolumns は無視されます。

バイナリ ファイル (binaryFile) 形式と text ファイル形式には固定データ スキーマがありますが、partitioncolumn 推論がサポートされています。 Databricks では、これらのファイル形式に対して cloudFiles.schemaLocation 設定をお勧めします。 これにより、潜在的なエラーや情報の損失を回避し、自動ローダーが開始されるたびにパーティション columns の推論を防ぐことができます。

Partitioncolumns は、schema 進化には考慮されません。 base_path/event=click/date=2021-04-01/f0.jsonのような初期ディレクトリ構造があり、base_path/event=click/date=2021-04-01/hour=01/f1.jsonとして新しいファイルの受信を開始した場合、自動ローダーは時間 columnを無視します。 新しいpartitioncolumnsの情報を取得するには、cloudFiles.partitionColumnsevent,date,hour にsetします。

注意

cloudFiles.partitionColumns オプションは、column名のコンマ区切りlistを受け取ります。 ディレクトリ構造に key=value ペアとして存在する columns のみが解析されます。

救助されたデータ columnは何ですか?

自動ローダーが schemaを推測すると、救助されたデータ column が自動的に _rescued_dataとして schema に追加されます。 オプション rescuedDataColumn を設定すると、columnの名前を変更したり、schemaを提供する場合にそれを含めたりできます。where

復旧されたデータ column により、schema と一致しない columns が削除されるのではなく、確実に救助されます。 復旧されたデータ column には、次の理由で解析されていないデータが含まれています。

  • schemaに column がありません。
  • 型が一致しない。
  • 大文字と小文字が一致しない。

復旧されたデータ column には、復旧された columns とレコードのソース ファイル パスを含む JSON が含まれています。

注意

JSON および CSV パーサーでは、レコードを解析するときに、PERMISSIVEDROPMALFORMEDFAILFAST の 3 つのモードがサポートされます。 rescuedDataColumn と共に使用すると、データ型の不一致によって、DROPMALFORMED モードでレコードが削除されたり、FAILFAST モードでエラーがスローされたりすることはありません。 不完全であるか形式に誤りがある JSON や CSV などの、破損したレコードのみが削除されたり、エラーをスローしたりします。 JSON または CSV を解析するときに badRecordsPath を使用する場合、rescuedDataColumn を使用すると、データ型の不一致は無効なレコードとは見なされません。 badRecordsPath には、不完全で形式に誤りがある JSON または CSV レコードだけが格納されます。

大文字と小文字が区別される動作を変更する

大文字と小文字の区別が有効になっていない限り、columnsabcAbc、および ABC は、schema の推論の目的で同じ column と見なされます。 大文字または小文字の選択は任意であり、サンプリングされたデータによって異なります。 schema ヒント を使用して、どのケースを使用するかを強制できます。 選択が行われてschemaが推論されると、自動ローダーでは、大文字と小文字の区別で選ばれなかった方は、schemaと一致するとは見なされなくなります。

復旧されたデータ 有効にすると、 以外のケースで名前が付けられたフィールドが に読み込まれます。 この動作を変更するには、オプション readerCaseSensitive を false に設定します。これにより、自動ローダーは大文字と小文字を区別せずにデータを読み取ります。

schema 推論を schema ヒントでオーバーライドする

schema ヒントを使うと、わかっていて望ましいschema情報を、推論されたschemaに適用できます。 column が特定のデータ型であることがわかっている場合、またはより一般的なデータ型 (たとえば、integerではなく double) を選択する場合は、SQL schema 仕様構文を使用して、column データ型の任意の数のヒントを文字列として指定できます。次に例を示します。

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

サポートされているデータ型の list については、データ型の に関するドキュメントを参照してください。

ストリームの開始時に column が存在しない場合は、schema ヒントを使用して、推論された schemaにその column を追加することもできます。

schema ヒントを使用して動作を確認するために推論された schema の例を次に示します。

推論されたschema:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

次のschema ヒントを指定します。

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

getされるもの:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

注意

配列とマップの schema ヒントのサポートは、Databricks Runtime 9.1 LTS 以降で利用できます。

ここでは、複雑なデータ型を使用して推論された schema の例を示し、schema ヒントを使用して動作を確認します。

推論されたschema:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

次のschema ヒントを指定します。

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

getされるもの:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

注意

自動ローダーに schema を 提供しない 場合にのみ、Schema ヒントが使用されます。 cloudFiles.inferColumnTypes が有効か無効かに関係なく、schema ヒントを使用できます。