XML 파일 읽기 및 쓰기
Important
이 기능은 공개 미리 보기 상태입니다.
이 문서에서는 XML 파일을 읽고 쓰는 방법을 설명합니다.
XML(Extensible Markup Language)은 데이터를 텍스트 형식으로 서식 지정, 저장 및 공유하기 위한 태그 언어입니다. 문서에서 임의의 데이터 구조에 이르는 데이터를 직렬화하는 규칙 집합을 정의합니다.
네이티브 XML 파일 형식 지원을 사용하면 일괄 처리 또는 스트리밍을 위해 XML 데이터를 수집, 쿼리 및 구문 분석할 수 있습니다. 스키마 및 데이터 형식을 자동으로 유추 및 발전시키고, from_xml
같은 SQL 식을 지원하며, XML 문서를 생성할 수 있습니다. 외부 jar가 필요하지 않으며 자동 로더 read_files
와 함께 원활하게 작동합니다 COPY INTO
. 필요에 따라 XSD(XML 스키마 정의)에 대해 각 행 수준 XML 레코드의 유효성을 검사할 수 있습니다.
요구 사항
Databricks Runtime 14.3 이상
XML 레코드 구문 분석
XML 사양은 올바른 형식의 구조를 지정합니다. 그러나 이 사양은 테이블 형식에 즉시 매핑되지는 않습니다. 에 매핑 rowTag
DataFrame
되는 Row
XML 요소를 나타내는 옵션을 지정해야 합니다. 요소가 rowTag
최상위 수준이 struct
됩니다. 최상위 수준의 rowTag
필드가 되는 자식 요소 struct
입니다.
이 레코드에 대한 스키마를 지정하거나 자동으로 유추할 수 있습니다. 파서는 요소만 검사 rowTag
하므로 DTD 및 외부 엔터티가 필터링됩니다.
다음 예제에서는 다른 rowTag
옵션을 사용하여 XML 파일의 스키마 유추 및 구문 분석을 보여 줍니다.
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
옵션을 "books"로 사용하여 XML 파일을 rowTag
읽습니다.
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
출력:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
XML 파일을 rowTag
"book"으로 읽습니다.
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
출력:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
데이터 원본 옵션
XML에 대한 데이터 원본 옵션은 다음과 같은 방법으로 지정할 수 있습니다.
-
.option/.options
다음의 메서드는 다음과 같습니다.- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- 다음 기본 제공 함수는 다음과 같습니다.
-
OPTIONS
CREATE TABLE 절
옵션 목록은자동 로더 옵션
XSD 지원
필요에 따라 XSD(XML 스키마 정의)를 통해 각 행 수준 XML 레코드의 유효성을 검사할 수 있습니다. XSD 파일이 옵션에 rowValidationXSDPath
지정됩니다. 그렇지 않으면 XSD가 제공되거나 유추된 스키마에 영향을 주지 않습니다. 유효성 검사에 실패한 레코드는 옵션 섹션에 설명된 손상된 레코드 처리 모드 옵션에 따라 "손상됨"으로 표시되고 처리됩니다.
XSDToSchema
사용하여 XSD 파일에서 Spark DataFrame 스키마를 추출할 수 있습니다. 단순, 복합 및 시퀀스 형식만 지원하며 기본 XSD 기능만 지원합니다.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
다음 표에서는 XSD 데이터 형식을 Spark 데이터 형식으로 변환하는 방법을 보여 줍니다.
XSD 데이터 형식 | Spark 데이터 형식 |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short , unsignedByte |
ShortType |
integer , negativeInteger , nonNegativeInteger , nonPositiveInteger , positiveInteger unsignedShort |
IntegerType |
long , unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
중첩된 XML 구문 분석
기존 DataFrame의 문자열 값 열에 있는 XML 데이터는 schema_of_xml
및 from_xml
을 사용하여 구문 분석할 수 있으며, 스키마와 구문 분석된 결과가 새로운 struct
열로 반환됩니다. XML 데이터는 인수 schema_of_xml
로 전달되며 from_xml
올바른 형식의 단일 XML 레코드여야 합니다.
schema_of_xml
Syntax
schema_of_xml(xmlStr [, options] )
인수
-
xmlStr
: 올바른 형식의 단일 XML 레코드를 지정하는 STRING 식입니다. -
options
: 지시문을 지정하는 선택적MAP<STRING,STRING>
리터럴입니다.
반환
n개의 문자열 필드를 가진 구조체 정의를 포함하는 STRING으로, 여기서 열 이름은 XML 요소 및 특성 이름에서 파생됩니다. 필드 값은 파생 형식의 SQL 형식을 보유합니다.
from_xml
Syntax
from_xml(xmlStr, schema [, options])
인수
-
xmlStr
: 올바른 형식의 단일 XML 레코드를 지정하는 STRING 식입니다. -
schema
: 문자열 식 또는 함수의 호출입니다schema_of_xml
. -
options
: 지시문을 지정하는 선택적MAP<STRING,STRING>
리터럴입니다.
반환
스키마 정의와 일치하는 필드 이름 및 형식이 있는 구조체입니다. 스키마는 쉼표로 구분된 열 이름 및 데이터 형식 쌍(예: CREATE TABLE
)으로 정의되어야 합니다. 데이터 원본 옵션에 표시된 대부분의 옵션은 다음과 같은 예외에 적용됩니다.
-
rowTag
: XML 레코드rowTag
가 하나뿐이므로 이 옵션을 적용할 수 없습니다. -
mode
(기본값:PERMISSIVE
): 구문 분석 중에 손상된 레코드를 처리하는 모드를 허용합니다.-
PERMISSIVE
: 손상된 레코드를 충족하는 경우 잘못된 형식의 문자열을 구성한columnNameOfCorruptRecord
필드에 넣고 형식이 잘못된 필드를null
.로 설정합니다. 손상된 레코드를 유지하려면 사용자 정의 스키마에서columnNameOfCorruptRecord
문자열 형식 필드를 설정할 수 있습니다. 스키마에 필드가 없으면 구문 분석 중에 손상된 레코드가 삭제됩니다. 스키마를 유추할 때 출력 스키마에columnNameOfCorruptRecord
필드를 암시적으로 추가합니다. -
FAILFAST
: 손상된 레코드를 충족하는 경우 예외를 throw합니다.
-
구조 변환
DataFrame과 XML 간의 구조 차이로 인해 XML 데이터에서 XML 데이터 간 DataFrame
DataFrame
변환 규칙이 있습니다. 옵션 excludeAttribute
으로 처리 특성을 사용하지 않도록 설정할 수 있습니다.
XML에서 DataFrame으로 변환
특성: 특성은 제목 접두사를 사용하여 필드로 변환됩니다 attributePrefix
.
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
는 아래 스키마를 생성합니다.
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
특성 또는 자식 요소를 포함하는 요소의 문자 데이터: 필드로 valueTag
구문 분석됩니다. 문자 데이터가 여러 번 발생하면 필드가 valueTag
형식으로 array
변환됩니다.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
는 아래 스키마를 생성합니다.
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
DataFrame에서 XML로 변환
배열내부의 배열 요소 DataFrame
내용을 쓰는 경우에 발생합니다. 따라서 XML 파일 읽기 및 쓰기의 왕복은 구조가 동일하지만 다른 원본에서 읽기를 DataFrame
작성하는 것은 다른 구조를 가질 수 있습니다.
아래 스키마가 있는 DataFrame:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
아래 데이터와 함께:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
는 아래 XML 파일을 생성합니다.
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
이름 없는 배열 DataFrame
의 요소 이름은 옵션 arrayElementName
(기본값: item
)으로 지정됩니다.
복구된 데이터 열
구조된 데이터 열은 ETL 중에 데이터를 손실하거나 놓치지 않도록 합니다. 파싱되지 않은 데이터를 캡처하기 위해, 다음 문제 중 하나가 레코드의 하나 이상의 필드에 있는 경우 복구된 데이터 열을 활성화할 수 있습니다.
- 제공된 스키마에 없습니다.
- 제공된 스키마의 데이터 형식과 일치하지 않습니다.
- 제공된 스키마의 필드 이름과 불일치가 있습니다.
구조된 데이터 열은 구조된 열과 레코드의 원본 파일 경로를 포함하는 JSON 문서로 반환됩니다. 복구된 데이터 열에서 원본 파일 경로를 제거하려면 다음 SQL 구성을 설정할 수 있습니다.
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
rescuedDataColumn
옵션을 열 이름으로 설정하여 spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
_rescued_data
과 같은 데이터를 읽을 때, 복원된 데이터 열을 활성화할 수 있습니다.
XML 파서는 레코드를 구문 분석할 때 세 가지 PERMISSIVE
DROPMALFORMED
FAILFAST
모드를 지원합니다.
rescuedDataColumn
과 함께 사용하면 데이터 형식 불일치로 인해 DROPMALFORMED
모드에서 레코드가 삭제되거나 FAILFAST
모드에서 오류가 throw되지 않습니다. 손상된 레코드(불완전하거나 형식이 잘못된 XML)만 삭제되거나 오류가 throw됩니다.
자동 로더의 스키마 유추 및 진화
이 항목 및 적용 가능한 옵션에 대한 자세한 내용은 자동 로더스키마 유추 및 진화 구성을 참조하세요. 로드된 XML 데이터의 스키마를 자동으로 검색하도록 자동 로더를 구성하여 데이터 스키마를 명시적으로 선언하지 않고 테이블을 초기화하고 새 열이 도입될 때 테이블 스키마를 발전시킬 수 있습니다. 이렇게 하면 시간이 지남에 따라 스키마 변경 내용을 수동으로 추적하고 적용할 필요가 없습니다.
기본적으로 자동 로더 스키마 유추는 형식 불일치로 인한 스키마 진화 문제를 방지하려고 합니다. 데이터 형식(JSON, CSV 및 XML)을 인코딩하지 않는 형식의 경우 자동 로더는 XML 파일의 중첩 필드를 포함하여 모든 열을 문자열로 유추합니다. Apache Spark DataFrameReader
스키마 유추에 다른 동작을 사용하여 샘플 데이터를 기반으로 XML 원본의 열에 대한 데이터 형식을 선택합니다. 자동 로더에서 이 동작을 사용하도록 설정하려면 옵션 cloudFiles.inferColumnTypes
true
설정합니다.
자동 로더는 데이터를 처리할 때 새 열의 추가를 검색합니다. 자동 로더가 새 열을 감지하면 스트림이 UnknownFieldException
오류 코드와 함께 중지됩니다. 스트림이 이 오류를 throw하기 전에 자동 로더는 데이터의 최신 마이크로 배치에 대한 스키마 유추를 수행하고 스키마의 끝에 새 열을 병합하여 스키마 위치를 최신 스키마로 업데이트합니다. 기존 열의 데이터 형식은 변경되지 않은 상태로 유지됩니다. 자동 로더는 스키마 진화을 위해
_rescued_data
열에 로드됩니다. 옵션을 readerCaseSensitive
설정하여 이 동작을 false
변경할 수 있습니다. 이 경우 자동 로더는 대/소문자를 구분하지 않는 방식으로 데이터를 읽습니다.
예제
이 섹션의 예제에서는 Apache Spark GitHub 리포지토리에서 다운로드할 수 있는 XML 파일을 사용합니다.
XML 읽기 및 쓰기
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
데이터를 읽을 때 스키마를 수동으로 지정할 수 있습니다.
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
SQL API
XML 데이터 원본은 데이터 형식을 유추할 수 있습니다.
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
DDL에서 열 이름 및 형식을 지정할 수도 있습니다. 이 경우 스키마는 자동으로 유추되지 않습니다.
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
COPY INTO 사용하여 XML 로드
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
행 유효성 검사를 사용하여 XML 읽기
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
중첩된 XML 구문 분석(from_xml 및 schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
SQL API를 사용하여 from_xml 및 schema_of_xml
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
자동 로더를 사용하여 XML 로드
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)