次の方法で共有


XML ファイルの読み取りと書き込み

重要

この機能はパブリック プレビュー段階にあります。

この記事では、XML ファイルの読み取りと書き込みを行う方法について説明します。

XML (Extensible Markup Language) は、テキスト形式のデータの書式設定、保存、共有を行うためのマークアップ言語です。 この言語では、ドキュメントから任意のデータ構造まで、さまざまなデータをシリアル化するための一連のルールが定義されます。

ネイティブな XML ファイル形式のサポートにより、バッチ処理またはストリーミングのための XML データのインジェスト、クエリ、解析が可能になります。 スキーマとデータ型を自動的に推論および進化させることができ、from_xml などの SQL 式がサポートされ、XML ドキュメントを生成できます。 外部 jar は必要ありません。自動ローダー、read_filesCOPY INTO とシームレスに連携します。 オプションで、XML スキーマ定義 (XSD) に対して各行レベルの XML レコードを検証できます。

要件

Databricks Runtime 14.3 以降

XML レコードを解析する

XML の仕様では、整形式の構造が義務付けられています。 ただし、この仕様はすぐに表形式にマップされるものではありません。 DataFrame Row にマップする XML 要素を示すには、rowTag オプションを指定する必要があります。 rowTag 要素は最上位レベルの struct になります。 rowTag の子要素は最上位レベルの struct のフィールドになります。

このレコードのスキーマを指定することも、自動的に推論させることもできます。 パーサーは rowTag 要素のみ調査するため、DTD と外部エンティティはフィルター処理されません。

次の例は、さまざまな rowTag オプションを使用した XML ファイルのスキーマ推論と解析を示しています。

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

rowTag オプションで XML ファイルを "books "として読み取る:

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

出力:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

rowTag で XML ファイルを "book "として読み取る:

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

出力:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

データ ソース オプション

XML のデータ ソース オプションは、次の方法で指定できます。

オプションの一覧については、「自動ローダーのオプション」を参照してください。

XSD のサポート

オプションで、XML スキーマ定義 (XSD) によって各行レベルの XML レコードを検証できます。 XSD ファイルは rowValidationXSDPath オプションで指定されます。 XSD から、指定または推論されたスキーマにそれ以外の影響は及びません。 検証に失敗したレコードは "破損 "とマークされ、オプション セクションで説明されている破損レコード処理モード オプションに基づいて処理されます。

XSDToSchema を使用して、XSD ファイルから Spark DataFrame スキーマを抽出します。 これは単純型、複合型、シーケンス型のみと、基本的な XSD 機能のみをサポートしています。

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

次の表は、XSD データ型から Sparkデータ型への変換を示しています。

XSD データ型 Spark データ型
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
$ ShortType
IntegerType
$ LongType
date DateType
dateTime TimestampType
Others StringType

入れ子になった XML を解析する

schema_of_xmlfrom_xml を使用して既存の DataFrame 内の文字列値列の XML データを解析することで、スキーマと解析結果を新しい struct 列として返すことができます。 schema_of_xmlfrom_xml への引数として渡される XML データは、1 つの整形式の XML レコードである必要があります。

schema_of_xml

構文

schema_of_xml(xmlStr [, options] )

引数

  • xmlStr: 1 つの整形式の XML レコードを指定する STRING 式。
  • options: ディレクティブを指定する省略可能な MAP<STRING,STRING> リテラル。

返品

列名が XML 要素名と属性名から派生した、n 個の文字列フィールドを含む構造体の配列定義を保持する STRING。 フィールドの値は、派生した書式付き SQL 型を保持します。

from_xml

構文

from_xml(xmlStr, schema [, options])

引数

  • xmlStr: 1 つの整形式の XML レコードを指定する STRING 式。
  • schema: STRING 式または schema_of_xml 関数の呼び出し。
  • options: ディレクティブを指定する省略可能な MAP<STRING,STRING> リテラル。

返品

スキーマ定義と一致するフィールド名と型を備えた構造体。 スキーマは、コンマで区切られた列名とデータ型のペアとして定義する必要があります (例: CREATE TABLE)。 データ ソース オプションに示されているほとんどのオプションは、次の例外を除いて適用できます。

  • rowTag: XML レコードが 1 つしかないため、rowTag オプションは適用されません。
  • mode(既定値: PERMISSIVE): 解析中に破損したレコードを処理するモードを許可します。
    • PERMISSIVE: 破損したレコードに合致する場合は、columnNameOfCorruptRecord によって構成されたフィールドに形式に誤りがある文字列を格納し、形式に誤りがあるフィールドを null に設定します。 破損したレコードを保持するには、ユーザー定義スキーマで columnNameOfCorruptRecord という名前の文字列型フィールドを設定できます。 スキーマにフィールドがない場合、破損したレコードが解析中に削除されます。 スキーマを推論すると、出力スキーマに columnNameOfCorruptRecord フィールドが暗黙的に追加されます。
    • FAILFAST: 破損したレコードに合致する場合に、例外をスローします。

構造体の変換

DataFrame と XML の構造上の違いにより、XML データから DataFrame への変換規則と DataFrame から XML データへの変換規則がいくつか存在します。 オプション excludeAttribute を含む場合は、属性の処理を無効にできることに注意してください。

XML から DataFrame への変換

属性: 属性は、見出しのプレフィックス attributePrefix を含むフィールドとして変換されます。

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

次のスキーマを生成します。

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

属性または子要素を含む要素内の文字データ: これらは valueTag フィールドに解析されます。 文字データが複数ある場合、valueTag フィールドは array 型に変換されます。

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

次のスキーマを生成します。

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

DataFrame から XML への変換

配列内の配列としての要素: フィールド ArrayType の要素を ArrayType とした DataFrame から XML ファイルを書き込むと、その要素の入れ子になった追加フィールドが含められます。 これは、XML データの読み書きでは起こりませんが、他のソースから読み取られた DataFrame を書き込むときに起こります。 したがって、XML ファイルの読み取りと書き込みにおけるラウンドトリップと構造は同じですが、他のソースから読み取られた DataFrame を別の構造になるように書き込むことは可能です。

以下のスキーマを含む DataFrame:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

以下のデータを含む:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

以下の XML ファイルを生成する:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

DataFrame 内の名前のない配列の要素名はオプション arrayElementName で指定されます (既定値: item)。

復旧されたデータ列

復旧されたデータ列により、ETL 中にデータが失われたり、欠落したりすることがなくなります。 復旧されたデータ列を有効にすると、レコード内の 1 つ以上のフィールドに次のいずれかの問題があって解析されなかったデータをキャプチャできます。

  • 指定されたスキーマに存在しない
  • 指定されたスキーマのデータ型と一致しない
  • 指定されたスキーマのフィールド名と大文字と小文字の区別が一致しない

復旧されたデータ列は、復旧された列と、レコードのソース ファイル パスを含む JSON ドキュメントとして返されます。 復旧されたデータ列からソース ファイル パスを削除するには、次の SQL 構成を設定できます。

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

データの読み取り時にオプション rescuedDataColumn を列名に設定することで、復旧されたデータ列を有効にできます。たとえば、spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)_rescued_data です。

XML パーサーでは、レコードを解析するときに、PERMISSIVEDROPMALFORMEDFAILFAST の 3 つのモードがサポートされます。 rescuedDataColumn と共に使用すると、データ型の不一致によって、DROPMALFORMED モードでレコードが削除されたり、FAILFAST モードでエラーがスローされたりすることはありません。 破損したレコード (不完全であるか形式に誤りがある XML) のみ、削除されたり、エラーがスローされたりします。

自動ローダーでのスキーマの推論と展開

このトピックと適用可能なオプションの詳細については、「自動ローダーでのスキーマの推論と展開の構成」を参照してください。 読み込まれた XML データのスキーマを自動的に検出するように自動ローダーを構成できます。これにより、データ スキーマを明示的に宣言せずにテーブルを初期化し、新しい列が導入されたときにテーブル スキーマを進化させることができます。 これにより、スキーマの変更を常時手動で追跡して適用する必要がなくなります。

既定では、自動ローダー スキーマ推論では、型の不一致によるスキーマの進化の問題を回避します。 データ型 (JSON、CSV、および XML) をエンコードしない形式の場合、自動ローダーは、XML ファイル内の入れ子になったフィールドなどのすべての列を文字列として推論します。 Apache Spark DataFrameReader では、スキーマ推論に異なる動作が使用され、サンプル データに基づいて XML ソースの列のデータ型が選択されます。 自動ローダーでこの動作を有効にするには、オプション cloudFiles.inferColumnTypestrue に設定します。

自動ローダーでは、データを処理する際に新しい列の追加を検出します。 自動ローダーで新しい列が検出されると、ストリームが UnknownFieldException で停止します。 ストリームからこのエラーがスローされる前に、自動ローダーによって、データの最新のマイクロバッチに対してスキーマ推論が実行され、新しい列をスキーマの末尾にマージすることによって、スキーマの場所が最新のスキーマで更新されます。 既存の列のデータ型は変更されません。 自動ローダーでは、スキーマの展開に関するさまざまなモードがサポートされています。これは、cloudFiles.schemaEvolutionMode オプションで設定できます。

スキーマ ヒントを使用して、推論されたスキーマに対して知っているか予想されるスキーマ情報を適用できます。 列が特定のデータ型であることがわかっている場合や、さらに一般的なデータ型 (たとえば、整数ではなく浮動小数点を選択する場合は、SQL スキーマ仕様構文を使用して文字列として、列のデータ型に任意の数のヒントを指定できます。 復旧されたデータ列が有効になっているときは、スキーマの大文字または小文字とは異なる名前が付けられたフィールドが _rescued_data 列に読み込まれます。 この動作を変更するには、オプション readerCaseSensitivefalse に設定します。これにより、自動ローダーは大文字と小文字を区別せずにデータを読み取ります。

このセクションの例では、[Apache Spark GitHub リポジトリ] でダウンロード可能な XML ファイルを使用しています。

XML の読み取りと書き込み

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

次のデータの読み取り時にスキーマを手動で指定することができます。

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

SQL API

XML データ ソースは、次のデータ型を推測できます。

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

DDL で列名と型を指定することもできます。 この場合、スキーマは自動的に推論されません。

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

COPY INTO を使用した XML の読み込み

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

行の検証を使用した XML の読み取り

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

入れ子になった XML の解析 (from_xml と schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

SQL API を使用した from_xml と schema_of_xml

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

自動ローダーを使用した XML の読み込み

Python

query = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", True)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Scala

val query = spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow()
  .toTable("table_name")
  )

その他のリソース

spark-xml ライブラリを使用した XML データの読み取りと書き込み