spark-xml
ライブラリを使用した XML データの読み取りと書き込み
重要
このドキュメントは廃止され、更新されない可能性があります。 このコンテンツに記載されている製品、サービス、またはテクノロジは、Databricks によって正式には動作保証またはテストされていません。
ネイティブ XML ファイル形式のサポートは、プライベート プレビューとして利用できます。 「XML ファイルの読み取りと書き込み」をご覧ください。
この記事では、Apache Spark データ ソースとして XML ファイルを読み書きする方法について説明します。
必要条件
Maven ライブラリとして
spark-xml
ライブラリを作成します。 Maven 座標には、次のように指定します。- Databricks Runtime 7.x 以降:
com.databricks:spark-xml_2.12:<release>
<release>
の最新バージョンについては、spark-xml
リリースを参照してください。- Databricks Runtime 7.x 以降:
クラスターにライブラリをインストールします。
例
このセクションの例では、books XML ファイルを使用します。
books XML ファイルを取得します。
$ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
DBFS にファイルをアップロードします。
XML データの読み取りと書き込み
SQL
/*Infer schema*/
CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
/*Specify column names and types*/
CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
Scala
// Infer schema
import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method
val df = spark.read
.option("rowTag", "book")
.xml("dbfs:/books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
// Specify schema
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read
.option("rowTag", "book")
.schema(customSchema)
.xml("books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
R
# Infer schema
library(SparkR)
sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))
df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")
# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")
# Specify schema
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")
[オプション]
- Read
path
: XML ファイルの場所。 標準の Hadoop グロビング式を受け入れます。rowTag
: 行として扱う行タグ。 たとえば、この XML<books><book><book>...</books>
では、値はbook
になります。 既定値はROW
です。samplingRatio
: スキーマを推論するためのサンプリング率 (0.0 ~ 1)。 既定値は 1 です。 スキーマを指定しない限り、可能な型はStructType
、ArrayType
、StringType
、LongType
、DoubleType
、BooleanType
、TimestampType
、NullType
です。excludeAttribute
: 要素内の属性を除外するかどうか。 既定値は false です。nullValue
:null
値として扱う値。 既定値は""
です。mode
: 破損したレコードを処理するためのモード。 既定値はPERMISSIVE
です。PERMISSIVE
$- 破損したレコードが検出された場合は、すべてのフィールドを
null
に設定し、誤りのある文字列をcolumnNameOfCorruptRecord
で構成された新しいフィールドに入力します。 - 間違ったデータ型のフィールドが検出された場合は、問題のあるフィールドを
null
に設定します。
- 破損したレコードが検出された場合は、すべてのフィールドを
DROPMALFORMED
: 破損したレコードを無視します。FAILFAST
: 破損したレコードを検出した場合に、例外をスローします。
inferSchema
:true
の場合は、ブール型、数値型、日付型など、結果として得られる各データフレーム列に対して適切な型を推論しようとします。false
の場合、結果の列はすべて文字列型です。 既定値はtrue
です。columnNameOfCorruptRecord
: 形式が正しくない文字列が格納される新しいフィールドの名前。 既定値は_corrupt_record
です。attributePrefix
: 属性と要素を区別するための属性のプレフィックス。 これは、フィールド名のプレフィックスです。 既定値は_
です。valueTag
: 子要素を持たない要素に属性がある場合に値に使用されるタグ。 既定値は_VALUE
です。charset
: 既定値はUTF-8
に設定されますが、他の有効な文字セット名に設定できます。ignoreSurroundingSpaces
: 値の前後の空白文字をスキップするかどうか。 既定値は false です。rowValidationXSDPath
: 各行の XML の検証に使用される XSD ファイルへのパス。 検証に失敗した行は、上記のように解析エラーと同様に処理されます。 XSD から、指定または推論されたスキーマにそれ以外の影響は及びません。 同じローカル パスが、クラスター内の Executor にもまだ表示されていない場合は、XSD とそれが依存しているその他のパスを SparkContext.addFile を使用して Spark Executor に追加する必要があります。 この場合、ローカル XSD/foo/bar.xsd
を使用するには、addFile("/foo/bar.xsd")
を呼び出して、"bar.xsd"
をrowValidationXSDPath
として渡します。
- Write
path
: ファイルを書き込む場所。rowTag
: 行として扱う行タグ。 たとえば、この XML<books><book><book>...</books>
では、値はbook
になります。 既定値はROW
です。rootTag
: ルートとして扱うルート タグ。 たとえば、この XML<books><book><book>...</books>
では、値はbooks
になります。 既定値はROWS
です。nullValue
:null
値を書き込む値。 既定値は、文字列"null"
です。"null"
の場合、フィールドの属性と要素は書き込まれません。attributePrefix
: 属性と要素を区別するための属性のプレフィックス。 これは、フィールド名のプレフィックスです。 既定値は_
です。valueTag
: 子要素を持たない要素に属性がある場合に値に使用されるタグ。 既定値は_VALUE
です。compression
: ファイルに保存するときに使用する圧縮コーデック。org.apache.hadoop.io.compress.CompressionCodec
を実装するクラスの完全修飾名か、大文字と小文字を区別しない短い名前 (bzip2
、gzip
、lz4
、snappy
) のいずれかを指定する必要があります。 既定値は圧縮なしです。
短縮名の使用がサポートされています。com.databricks.spark.xml
の代わりに xml
を使用できます。
XSD のサポート
rowValidationXSDPath
を使用して XSD スキーマに対して個々の行を検証できます。
ユーティリティ com.databricks.spark.xml.util.XSDToSchema
を使用して、一部の XSD ファイルから Spark データフレーム スキーマを抽出します。 これは単純型、複合型、シーケンス型のみと、基本的な XSD 機能のみをサポートしており、試験段階にあります。
import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths
val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)
入れ子になった XML を解析する
主に XML ファイルをデータフレームに変換するために使用されますが、from_xml
メソッドを使用して、既存のデータフレーム内の文字列値列の XML を解析し、次の構造体のような解析結果を含んだ新しい列として追加することもできます。
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
Note
mode
$PERMISSIVE
に設定した場合、解析モードは代わりに既定で、DROPMALFORMED
に設定されます。columnNameOfCorruptRecord
に一致するfrom_xml
のスキーマに列を含める場合、PERMISSIVE
モードでは、結果の構造体のその列に、誤った形式のレコードが出力されます。DROPMALFORMED
に設定した場合、正しく解析されない XML 値は、列のnull
値になります。 行は破棄されません。
from_xml
では、XML を含む文字列の配列が、解析された構造体の配列に変換されます。 代わりにschema_of_xml_array
を使用してくださいfrom_xml_string
は、列ではなく文字列を直接操作する、UDF で使用する代替手段です。
変換規則
データフレームと XML の構造上の違いにより、XML データからデータフレームへの変換規則とデータフレームから XML データへの変換規則がいくつか存在します。 オプション excludeAttribute
を使用して、属性の処理を無効にできます。
XML をデータフレームに変換する
属性: 属性は、
attributePrefix
オプションでプレフィックスが指定されたフィールドとして変換されます。attributePrefix
が_
の場合、次のドキュメントは<one myOneAttrib="AAAA"> <two>two</two> <three>three</three> </one>
次のスキーマを生成します。
root |-- _myOneAttrib: string (nullable = true) |-- two: string (nullable = true) |-- three: string (nullable = true)
要素に続性があるが、子要素がない場合、属性値は、
valueTag
オプションで指定された個別のフィールドに置かれます。valueTag
が_VALUE
の場合、次のドキュメントは<one> <two myTwoAttrib="BBBBB">two</two> <three>three</three> </one>
次のスキーマを生成します。
root |-- two: struct (nullable = true) | |-- _VALUE: string (nullable = true) | |-- _myTwoAttrib: string (nullable = true) |-- three: string (nullable = true)
データフレームを XML に変換する
フィールド ArrayType
を持ち、その要素を ArrayType
とした DataFrame から XML ファイルを書き込むと、その要素に対して追加の入れ子になったフィールドを持つことになります。 これは、XML データの読み書きでは起こりませんが、他のソースから読み取られた DataFrame を書き込むときに起こります。 したがって、XML ファイルの読み取りと書き込みにおけるラウンドトリップと構造は同じですが、他のソースから読み取られたデータフレームを別の構造になるように書き込むことは可能です。
次のスキーマ:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
および次のデータ:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
を含むデータフレームにより次の XML ファイルが生成されます。
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>