次の方法で共有


spark-xml ライブラリを使用した XML データの読み取りと書き込み

重要

このドキュメントは廃止され、更新されない可能性があります。 このコンテンツに記載されている製品、サービス、またはテクノロジは、Databricks によって正式には動作保証またはテストされていません。

ネイティブ XML ファイル形式のサポートは、プライベート プレビューとして利用できます。 「XML ファイルの読み取りと書き込み」をご覧ください。

この記事では、Apache Spark データ ソースとして XML ファイルを読み書きする方法について説明します。

必要条件

  1. Maven ライブラリとして spark-xml ライブラリを作成します。 Maven 座標には、次のように指定します。

    • Databricks Runtime 7.x 以降: com.databricks:spark-xml_2.12:<release>

    <release> の最新バージョンについては、spark-xml リリースを参照してください。

  2. クラスターにライブラリをインストールします

このセクションの例では、books XML ファイルを使用します。

  1. books XML ファイルを取得します。

    $ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
    
  2. DBFS にファイルをアップロードします。

XML データの読み取りと書き込み

SQL

/*Infer schema*/

CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

/*Specify column names and types*/

CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

Scala

// Infer schema

import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method

val df = spark.read
  .option("rowTag", "book")
  .xml("dbfs:/books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

// Specify schema

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))

val df = spark.read
  .option("rowTag", "book")
  .schema(customSchema)
  .xml("books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

R

# Infer schema

library(SparkR)

sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))

df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")

# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")

# Specify schema

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")

# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")

[オプション]

  • Read
    • path: XML ファイルの場所。 標準の Hadoop グロビング式を受け入れます。
    • rowTag: 行として扱う行タグ。 たとえば、この XML <books><book><book>...</books> では、値は book になります。 既定値は ROW です。
    • samplingRatio: スキーマを推論するためのサンプリング率 (0.0 ~ 1)。 既定値は 1 です。 スキーマを指定しない限り、可能な型は StructTypeArrayTypeStringTypeLongTypeDoubleTypeBooleanTypeTimestampTypeNullType です。
    • excludeAttribute: 要素内の属性を除外するかどうか。 既定値は false です。
    • nullValue: null 値として扱う値。 既定値は "" です。
    • mode: 破損したレコードを処理するためのモード。 既定値は PERMISSIVE です。
      • PERMISSIVE $
        • 破損したレコードが検出された場合は、すべてのフィールドを null に設定し、誤りのある文字列を columnNameOfCorruptRecord で構成された新しいフィールドに入力します。
        • 間違ったデータ型のフィールドが検出された場合は、問題のあるフィールドを null に設定します。
      • DROPMALFORMED: 破損したレコードを無視します。
      • FAILFAST: 破損したレコードを検出した場合に、例外をスローします。
    • inferSchema: true の場合は、ブール型、数値型、日付型など、結果として得られる各データフレーム列に対して適切な型を推論しようとします。 false の場合、結果の列はすべて文字列型です。 既定値は true です。
    • columnNameOfCorruptRecord: 形式が正しくない文字列が格納される新しいフィールドの名前。 既定値は _corrupt_record です。
    • attributePrefix: 属性と要素を区別するための属性のプレフィックス。 これは、フィールド名のプレフィックスです。 既定値は _ です。
    • valueTag: 子要素を持たない要素に属性がある場合に値に使用されるタグ。 既定値は _VALUE です。
    • charset: 既定値は UTF-8 に設定されますが、他の有効な文字セット名に設定できます。
    • ignoreSurroundingSpaces: 値の前後の空白文字をスキップするかどうか。 既定値は false です。
    • rowValidationXSDPath: 各行の XML の検証に使用される XSD ファイルへのパス。 検証に失敗した行は、上記のように解析エラーと同様に処理されます。 XSD から、指定または推論されたスキーマにそれ以外の影響は及びません。 同じローカル パスが、クラスター内の Executor にもまだ表示されていない場合は、XSD とそれが依存しているその他のパスを SparkContext.addFile を使用して Spark Executor に追加する必要があります。 この場合、ローカル XSD /foo/bar.xsd を使用するには、addFile("/foo/bar.xsd") を呼び出して、"bar.xsd"rowValidationXSDPath として渡します。
  • Write
    • path: ファイルを書き込む場所。
    • rowTag: 行として扱う行タグ。 たとえば、この XML <books><book><book>...</books> では、値は book になります。 既定値は ROW です。
    • rootTag: ルートとして扱うルート タグ。 たとえば、この XML <books><book><book>...</books> では、値は books になります。 既定値は ROWS です。
    • nullValue: null 値を書き込む値。 既定値は、文字列 "null" です。 "null" の場合、フィールドの属性と要素は書き込まれません。
    • attributePrefix: 属性と要素を区別するための属性のプレフィックス。 これは、フィールド名のプレフィックスです。 既定値は _ です。
    • valueTag: 子要素を持たない要素に属性がある場合に値に使用されるタグ。 既定値は _VALUE です。
    • compression: ファイルに保存するときに使用する圧縮コーデック。 org.apache.hadoop.io.compress.CompressionCodec を実装するクラスの完全修飾名か、大文字と小文字を区別しない短い名前 (bzip2gziplz4snappy) のいずれかを指定する必要があります。 既定値は圧縮なしです。

短縮名の使用がサポートされています。com.databricks.spark.xml の代わりに xml を使用できます。

XSD のサポート

rowValidationXSDPath を使用して XSD スキーマに対して個々の行を検証できます。

ユーティリティ com.databricks.spark.xml.util.XSDToSchema を使用して、一部の XSD ファイルから Spark データフレーム スキーマを抽出します。 これは単純型、複合型、シーケンス型のみと、基本的な XSD 機能のみをサポートしており、試験段階にあります。

import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths

val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)

入れ子になった XML を解析する

主に XML ファイルをデータフレームに変換するために使用されますが、from_xml メソッドを使用して、既存のデータフレーム内の文字列値列の XML を解析し、次の構造体のような解析結果を含んだ新しい列として追加することもできます。

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._

val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))

Note

  • mode $
    • PERMISSIVE に設定した場合、解析モードは代わりに既定で、DROPMALFORMED に設定されます。 columnNameOfCorruptRecord に一致する from_xml のスキーマに列を含める場合、PERMISSIVE モードでは、結果の構造体のその列に、誤った形式のレコードが出力されます。
    • DROPMALFORMED に設定した場合、正しく解析されない XML 値は、列の null 値になります。 行は破棄されません。
  • from_xml では、XML を含む文字列の配列が、解析された構造体の配列に変換されます。 代わりに schema_of_xml_array を使用してください
  • from_xml_string は、列ではなく文字列を直接操作する、UDF で使用する代替手段です。

変換規則

データフレームと XML の構造上の違いにより、XML データからデータフレームへの変換規則とデータフレームから XML データへの変換規則がいくつか存在します。 オプション excludeAttribute を使用して、属性の処理を無効にできます。

XML をデータフレームに変換する

  • 属性: 属性は、attributePrefix オプションでプレフィックスが指定されたフィールドとして変換されます。 attributePrefix_ の場合、次のドキュメントは

    <one myOneAttrib="AAAA">
        <two>two</two>
        <three>three</three>
    </one>
    

    次のスキーマを生成します。

    root
    |-- _myOneAttrib: string (nullable = true)
    |-- two: string (nullable = true)
    |-- three: string (nullable = true)
    
  • 要素に続性があるが、子要素がない場合、属性値は、valueTag オプションで指定された個別のフィールドに置かれます。 valueTag_VALUE の場合、次のドキュメントは

    <one>
        <two myTwoAttrib="BBBBB">two</two>
        <three>three</three>
    </one>
    

    次のスキーマを生成します。

    root
    |-- two: struct (nullable = true)
    |    |-- _VALUE: string (nullable = true)
    |    |-- _myTwoAttrib: string (nullable = true)
    |-- three: string (nullable = true)
    

データフレームを XML に変換する

フィールド ArrayType を持ち、その要素を ArrayType とした DataFrame から XML ファイルを書き込むと、その要素に対して追加の入れ子になったフィールドを持つことになります。 これは、XML データの読み書きでは起こりませんが、他のソースから読み取られた DataFrame を書き込むときに起こります。 したがって、XML ファイルの読み取りと書き込みにおけるラウンドトリップと構造は同じですが、他のソースから読み取られたデータフレームを別の構造になるように書き込むことは可能です。

次のスキーマ:

 |-- a: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

および次のデータ:

+------------------------------------+
|                                   a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

を含むデータフレームにより次の XML ファイルが生成されます。

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>