XML ファイルの読み取りと書き込み
重要
この機能はパブリック プレビュー段階にあります。
この記事では、XML ファイルの読み取りと書き込みを行う方法について説明します。
XML (Extensible Markup Language) は、テキスト形式のデータの書式設定、保存、共有を行うためのマークアップ言語です。 この言語では、ドキュメントから任意のデータ構造まで、さまざまなデータをシリアル化するための一連のルールが定義されます。
ネイティブな XML ファイル形式のサポートにより、バッチ処理またはストリーミングのための XML データのインジェスト、クエリ、解析が可能になります。 スキーマとデータ型を自動的に推論および進化させることができ、from_xml
などの SQL 式がサポートされ、XML ドキュメントを生成できます。 外部 jar は必要ありません。自動ローダー、read_files
、COPY INTO
とシームレスに連携します。 オプションで、XML スキーマ定義 (XSD) に対して各行レベルの XML レコードを検証できます。
要件
Databricks Runtime 14.3 以降
XML レコードを解析する
XML の仕様では、整形式の構造が義務付けられています。 ただし、この仕様はすぐに表形式にマップされるものではありません。 DataFrame
Row
にマップする XML 要素を示すには、rowTag
オプションを指定する必要があります。 rowTag
要素は最上位レベルの struct
になります。 rowTag
の子要素は最上位レベルの struct
のフィールドになります。
このレコードのスキーマを指定することも、自動的に推論させることもできます。 パーサーは rowTag
要素のみ調査するため、DTD と外部エンティティはフィルター処理されません。
次の例は、さまざまな rowTag
オプションを使用した XML ファイルのスキーマ推論と解析を示しています。
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
rowTag
オプションで XML ファイルを "books "として読み取る:
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
出力:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
rowTag
で XML ファイルを "book "として読み取る:
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
出力:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
データ ソース オプション
XML のデータ ソース オプションは、次の方法で指定できます。
- 次の
.option/.options
メソッド:- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- 次の組み込み関数:
- CREATE TABLE USING DATA_SOURCE の
OPTIONS
句
オプションの一覧については、「自動ローダーのオプション」を参照してください。
XSD のサポート
オプションで、XML スキーマ定義 (XSD) によって各行レベルの XML レコードを検証できます。 XSD ファイルは rowValidationXSDPath
オプションで指定されます。 XSD から、指定または推論されたスキーマにそれ以外の影響は及びません。 検証に失敗したレコードは "破損 "とマークされ、オプション セクションで説明されている破損レコード処理モード オプションに基づいて処理されます。
XSDToSchema
を使用して、XSD ファイルから Spark DataFrame スキーマを抽出します。 これは単純型、複合型、シーケンス型のみと、基本的な XSD 機能のみをサポートしています。
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
次の表は、XSD データ型から Sparkデータ型への変換を示しています。
XSD データ型 | Spark データ型 |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
$ | ShortType |
IntegerType |
|
$ | LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
入れ子になった XML を解析する
schema_of_xml
と from_xml
を使用して既存の DataFrame 内の文字列値列の XML データを解析することで、スキーマと解析結果を新しい struct
列として返すことができます。 schema_of_xml
と from_xml
への引数として渡される XML データは、1 つの整形式の XML レコードである必要があります。
schema_of_xml
構文
schema_of_xml(xmlStr [, options] )
引数
xmlStr
: 1 つの整形式の XML レコードを指定する STRING 式。options
: ディレクティブを指定する省略可能なMAP<STRING,STRING>
リテラル。
返品
列名が XML 要素名と属性名から派生した、n 個の文字列フィールドを含む構造体の配列定義を保持する STRING。 フィールドの値は、派生した書式付き SQL 型を保持します。
from_xml
構文
from_xml(xmlStr, schema [, options])
引数
xmlStr
: 1 つの整形式の XML レコードを指定する STRING 式。schema
: STRING 式またはschema_of_xml
関数の呼び出し。options
: ディレクティブを指定する省略可能なMAP<STRING,STRING>
リテラル。
返品
スキーマ定義と一致するフィールド名と型を備えた構造体。 スキーマは、コンマで区切られた列名とデータ型のペアとして定義する必要があります (例: CREATE TABLE
)。 データ ソース オプションに示されているほとんどのオプションは、次の例外を除いて適用できます。
rowTag
: XML レコードが 1 つしかないため、rowTag
オプションは適用されません。mode
(既定値:PERMISSIVE
): 解析中に破損したレコードを処理するモードを許可します。PERMISSIVE
: 破損したレコードに合致する場合は、columnNameOfCorruptRecord
によって構成されたフィールドに形式に誤りがある文字列を格納し、形式に誤りがあるフィールドをnull
に設定します。 破損したレコードを保持するには、ユーザー定義スキーマでcolumnNameOfCorruptRecord
という名前の文字列型フィールドを設定できます。 スキーマにフィールドがない場合、破損したレコードが解析中に削除されます。 スキーマを推論すると、出力スキーマにcolumnNameOfCorruptRecord
フィールドが暗黙的に追加されます。FAILFAST
: 破損したレコードに合致する場合に、例外をスローします。
構造体の変換
DataFrame と XML の構造上の違いにより、XML データから DataFrame
への変換規則と DataFrame
から XML データへの変換規則がいくつか存在します。 オプション excludeAttribute
を含む場合は、属性の処理を無効にできることに注意してください。
XML から DataFrame への変換
属性: 属性は、見出しのプレフィックス attributePrefix
を含むフィールドとして変換されます。
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
次のスキーマを生成します。
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
属性または子要素を含む要素内の文字データ: これらは valueTag
フィールドに解析されます。 文字データが複数ある場合、valueTag
フィールドは array
型に変換されます。
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
次のスキーマを生成します。
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
DataFrame から XML への変換
配列内の配列としての要素: フィールド ArrayType
の要素を ArrayType
とした DataFrame
から XML ファイルを書き込むと、その要素の入れ子になった追加フィールドが含められます。 これは、XML データの読み書きでは起こりませんが、他のソースから読み取られた DataFrame
を書き込むときに起こります。 したがって、XML ファイルの読み取りと書き込みにおけるラウンドトリップと構造は同じですが、他のソースから読み取られた DataFrame
を別の構造になるように書き込むことは可能です。
以下のスキーマを含む DataFrame:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
以下のデータを含む:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
以下の XML ファイルを生成する:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
DataFrame
内の名前のない配列の要素名はオプション arrayElementName
で指定されます (既定値: item
)。
復旧されたデータ列
復旧されたデータ列により、ETL 中にデータが失われたり、欠落したりすることがなくなります。 復旧されたデータ列を有効にすると、レコード内の 1 つ以上のフィールドに次のいずれかの問題があって解析されなかったデータをキャプチャできます。
- 指定されたスキーマに存在しない
- 指定されたスキーマのデータ型と一致しない
- 指定されたスキーマのフィールド名と大文字と小文字の区別が一致しない
復旧されたデータ列は、復旧された列と、レコードのソース ファイル パスを含む JSON ドキュメントとして返されます。 復旧されたデータ列からソース ファイル パスを削除するには、次の SQL 構成を設定できます。
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
データの読み取り時にオプション rescuedDataColumn
を列名に設定することで、復旧されたデータ列を有効にできます。たとえば、spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
の _rescued_data
です。
XML パーサーでは、レコードを解析するときに、PERMISSIVE
、DROPMALFORMED
、FAILFAST
の 3 つのモードがサポートされます。 rescuedDataColumn
と共に使用すると、データ型の不一致によって、DROPMALFORMED
モードでレコードが削除されたり、FAILFAST
モードでエラーがスローされたりすることはありません。 破損したレコード (不完全であるか形式に誤りがある XML) のみ、削除されたり、エラーがスローされたりします。
自動ローダーでのスキーマの推論と展開
このトピックと適用可能なオプションの詳細については、「自動ローダーでのスキーマの推論と展開の構成」を参照してください。 読み込まれた XML データのスキーマを自動的に検出するように自動ローダーを構成できます。これにより、データ スキーマを明示的に宣言せずにテーブルを初期化し、新しい列が導入されたときにテーブル スキーマを進化させることができます。 これにより、スキーマの変更を常時手動で追跡して適用する必要がなくなります。
既定では、自動ローダー スキーマ推論では、型の不一致によるスキーマの進化の問題を回避します。 データ型 (JSON、CSV、および XML) をエンコードしない形式の場合、自動ローダーは、XML ファイル内の入れ子になったフィールドなどのすべての列を文字列として推論します。 Apache Spark DataFrameReader
では、スキーマ推論に異なる動作が使用され、サンプル データに基づいて XML ソースの列のデータ型が選択されます。 自動ローダーでこの動作を有効にするには、オプション cloudFiles.inferColumnTypes
を true
に設定します。
自動ローダーでは、データを処理する際に新しい列の追加を検出します。 自動ローダーで新しい列が検出されると、ストリームが UnknownFieldException
で停止します。 ストリームからこのエラーがスローされる前に、自動ローダーによって、データの最新のマイクロバッチに対してスキーマ推論が実行され、新しい列をスキーマの末尾にマージすることによって、スキーマの場所が最新のスキーマで更新されます。 既存の列のデータ型は変更されません。 自動ローダーでは、スキーマの展開に関するさまざまなモードがサポートされています。これは、cloudFiles.schemaEvolutionMode
オプションで設定できます。
スキーマ ヒントを使用して、推論されたスキーマに対して知っているか予想されるスキーマ情報を適用できます。 列が特定のデータ型であることがわかっている場合や、さらに一般的なデータ型 (たとえば、整数ではなく浮動小数点を選択する場合は、SQL スキーマ仕様構文を使用して文字列として、列のデータ型に任意の数のヒントを指定できます。 復旧されたデータ列が有効になっているときは、スキーマの大文字または小文字とは異なる名前が付けられたフィールドが _rescued_data
列に読み込まれます。 この動作を変更するには、オプション readerCaseSensitive
を false
に設定します。これにより、自動ローダーは大文字と小文字を区別せずにデータを読み取ります。
例
このセクションの例では、[Apache Spark GitHub リポジトリ] でダウンロード可能な XML ファイルを使用しています。
XML の読み取りと書き込み
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
次のデータの読み取り時にスキーマを手動で指定することができます。
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
SQL API
XML データ ソースは、次のデータ型を推測できます。
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
DDL で列名と型を指定することもできます。 この場合、スキーマは自動的に推論されません。
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
COPY INTO を使用した XML の読み込み
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
行の検証を使用した XML の読み取り
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
入れ子になった XML の解析 (from_xml と schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
SQL API を使用した from_xml と schema_of_xml
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
自動ローダーを使用した XML の読み込み
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)