XML 파일 읽기 및 쓰기
Important
이 기능은 공개 미리 보기 상태입니다.
이 문서에서는 XML 파일을 읽고 쓰는 방법을 설명합니다.
XML(Extensible Markup Language)은 데이터를 텍스트 형식으로 서식 지정, 저장 및 공유하기 위한 태그 언어입니다. 문서에서 임의의 데이터 구조에 이르는 데이터를 직렬화하는 규칙 집합을 정의합니다.
네이티브 XML 파일 형식 지원을 사용하면 일괄 처리 또는 스트리밍을 위해 XML 데이터를 수집, 쿼리 및 구문 분석할 수 있습니다. 스키마 및 데이터 형식을 자동으로 유추 및 발전시키고, from_xml
(와)과 같은 SQL 식을 지원하며, XML 문서를 생성할 수 있습니다. 외부 jar가 필요하지 않으며 자동 로더 read_files
와 함께 원활하게 작동합니다 COPY INTO
. 필요에 따라 XSD(XML 스키마 정의)에 대해 각 행 수준 XML 레코드의 유효성을 검사할 수 있습니다.
요구 사항
Databricks Runtime 14.3 이상
XML 레코드 구문 분석
XML 사양은 올바른 형식의 구조를 지정합니다. 그러나 이 사양은 테이블 형식에 즉시 매핑되지는 않습니다. 에 매핑 DataFrame
Row
되는 rowTag
XML 요소를 나타내는 옵션을 지정해야 합니다. 요소가 rowTag
최상위 수준이 struct
됩니다. 최상위 수준의 struct
필드가 되는 자식 요소 rowTag
입니다.
이 레코드에 대한 스키마를 지정하거나 자동으로 유추할 수 있습니다. 파서는 요소만 검사 rowTag
하므로 DTD 및 외부 엔터티가 필터링됩니다.
다음 예제에서는 다른 rowTag
옵션을 사용하여 XML 파일의 스키마 유추 및 구문 분석을 보여 줍니다.
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
옵션을 "books"로 사용하여 XML 파일을 rowTag
읽습니다.
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
출력:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
XML 파일을 rowTag
"book"으로 읽습니다.
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
출력:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
데이터 원본 옵션
XML에 대한 데이터 원본 옵션은 다음과 같은 방법으로 지정할 수 있습니다.
.option/.options
다음의 메서드는 다음과 같습니다.- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- 다음 기본 제공 함수는 다음과 같습니다.
OPTIONS
CREATE TABLE USING DATA_SOURCE 절
옵션 목록은 자동 로더 옵션을 참조 하세요.
XSD 지원
필요에 따라 XSD(XML 스키마 정의)를 통해 각 행 수준 XML 레코드의 유효성을 검사할 수 있습니다. XSD 파일이 옵션에 rowValidationXSDPath
지정됩니다. XSD는 제공되거나 유추된 스키마에 달리 영향을 미치지 않습니다. 유효성 검사에 실패한 레코드는 옵션 섹션에 설명된 손상된 레코드 처리 모드 옵션에 따라 "손상됨"으로 표시되고 처리됩니다.
XSD 파일에서 Spark DataFrame 스키마를 추출하는 데 사용할 XSDToSchema
수 있습니다. 단순, 복합 및 시퀀스 형식만 지원하며 기본 XSD 기능만 지원합니다.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
다음 표에서는 XSD 데이터 형식을 Spark 데이터 형식으로 변환하는 방법을 보여 줍니다.
XSD 데이터 형식 | Spark 데이터 형식 |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short , unsignedByte |
ShortType |
integer , negativeInteger , nonNegativeInteger , nonPositiveInteger , positiveInteger unsignedShort |
IntegerType |
long , unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
중첩된 XML 구문 분석
기존 DataFrame의 문자열 반환 열에 있는 XML 데이터를 구문 분석 schema_of_xml
할 수 있으며 스키마와 from_xml
구문 분석된 결과를 새 struct
열로 반환합니다. XML 데이터는 인수 schema_of_xml
로 전달되며 from_xml
올바른 형식의 단일 XML 레코드여야 합니다.
schema_of_xml
Syntax
schema_of_xml(xmlStr [, options] )
인수
xmlStr
: 올바른 형식의 단일 XML 레코드를 지정하는 STRING 식입니다.options
: 지시문을 지정하는 선택적MAP<STRING,STRING>
리터럴입니다.
반환
열 이름이 XML 요소 및 특성 이름에서 파생되는 문자열 필드가 n개인 구조체 정의를 포함하는 STRING입니다. 필드 값은 파생된 형식이 지정된 SQL 형식을 보유합니다.
from_xml
Syntax
from_xml(xmlStr, schema [, options])
인수
xmlStr
: 올바른 형식의 단일 XML 레코드를 지정하는 STRING 식입니다.schema
: 문자열 식 또는 함수의 호출입니다schema_of_xml
.options
: 지시문을 지정하는 선택적MAP<STRING,STRING>
리터럴입니다.
반환
스키마 정의와 일치하는 필드 이름 및 형식이 있는 구조체입니다. 스키마는 쉼표로 구분된 열 이름 및 데이터 형식 쌍으로 정의되어야 합니다( 예: CREATE TABLE
). 데이터 원본 옵션에 표시된 대부분의 옵션은 다음과 같은 예외에 적용됩니다.
rowTag
: XML 레코드rowTag
가 하나뿐이므로 이 옵션을 적용할 수 없습니다.mode
(기본값:PERMISSIVE
): 구문 분석 중에 손상된 레코드를 처리하는 모드를 허용합니다.PERMISSIVE
: 손상된 레코드를 충족하는 경우 잘못된 형식의 문자열을 구성한columnNameOfCorruptRecord
필드에 넣고 형식이 잘못된 필드를null
.로 설정합니다. 손상된 레코드를 유지하려면 사용자 정의 스키마에 명명된columnNameOfCorruptRecord
문자열 형식 필드를 설정할 수 있습니다. 스키마에 필드가 없으면 구문 분석 중에 손상된 레코드를 삭제합니다. 스키마를 유추할 때 출력 스키마에columnNameOfCorruptRecord
필드를 암시적으로 추가합니다.FAILFAST
: 손상된 레코드를 충족하는 경우 예외를 throw합니다.
구조 변환
DataFrame과 XML 간의 구조 차이로 인해 XML 데이터에서 XML 데이터 간 DataFrame
DataFrame
변환 규칙이 있습니다. 옵션 excludeAttribute
으로 처리 특성을 사용하지 않도록 설정할 수 있습니다.
XML에서 DataFrame으로 변환
특성: 특성은 제목 접두사를 사용하여 필드로 변환됩니다 attributePrefix
.
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
는 아래 스키마를 생성합니다.
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
특성 또는 자식 요소를 포함하는 요소의 문자 데이터: 필드로 valueTag
구문 분석됩니다. 문자 데이터가 여러 번 발생하면 필드가 valueTag
형식으로 array
변환됩니다.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
는 아래 스키마를 생성합니다.
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
DataFrame에서 XML로 변환
배열의 배열로 요소: 요소에 대한 추가 중첩 필드가 있는 것처럼 ArrayType
해당 요소가 있는 필드 ArrayType
가 없으면 XML 파일을 DataFrame
작성합니다. 이는 XML 데이터를 읽고 쓰는 것이 아니라 다른 원본에서 읽은 DataFrame
내용을 쓰는 경우에 발생합니다. 따라서 XML 파일 읽기 및 쓰기의 왕복은 구조가 동일하지만 다른 원본에서 읽기를 DataFrame
작성하는 것은 다른 구조를 가질 수 있습니다.
아래 스키마가 있는 DataFrame:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
아래 데이터와 함께:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
는 아래 XML 파일을 생성합니다.
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
이름 없는 배열 DataFrame
의 요소 이름은 옵션 arrayElementName
(기본값: item
)으로 지정됩니다.
복구된 데이터 열
복구된 데이터 열은 ETL 중에 데이터를 손실하거나 누락하지 않도록 합니다. 레코드의 하나 이상의 필드에 다음 문제 중 하나가 있으므로 구문 분석되지 않은 데이터를 캡처하도록 구조된 데이터 열을 사용하도록 설정할 수 있습니다.
- 제공된 스키마가 없습니다.
- 제공된 스키마의 데이터 형식과 일치하지 않습니다.
- 제공된 스키마의 필드 이름과 대/소문자 불일치가 있습니다.
복구된 데이터 열은 복구된 열과 레코드의 원본 파일 경로를 포함하는 JSON 문서로 반환됩니다. 복구된 데이터 열에서 원본 파일 경로를 제거하려면 다음 SQL 구성을 설정할 수 있습니다.
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
데이터를 읽을 때 rescuedDataColumn
옵션을 열 이름으로 설정하여 복구된 데이터 열을 사용하도록 설정할 수 있습니다(예: spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
가 있는 _rescued_data
).
XML 파서는 레코드를 구문 분석할 때 세 가지 PERMISSIVE
DROPMALFORMED
FAILFAST
모드를 지원합니다. rescuedDataColumn
과 함께 사용하면 데이터 형식 불일치로 인해 DROPMALFORMED
모드에서 레코드가 삭제되거나 FAILFAST
모드에서 오류가 throw되지 않습니다. 손상된 레코드(불완전하거나 형식이 잘못된 XML)만 삭제되거나 오류가 throw됩니다.
자동 로더의 스키마 유추 및 진화
이 항목 및 적용 가능한 옵션에 대한 자세한 내용은 자동 로더에서 스키마 유추 및 진화 구성을 참조 하세요. 로드된 XML 데이터의 스키마를 자동으로 검색하도록 자동 로더를 구성하여 데이터 스키마를 명시적으로 선언하지 않고 테이블을 초기화하고 새 열이 도입될 때 테이블 스키마를 발전시킬 수 있습니다. 이렇게 하면 시간이 지남에 따라 스키마 변경 내용을 수동으로 추적하고 적용할 필요가 없습니다.
기본적으로 자동 로더 스키마 유추는 형식 불일치로 인한 스키마 진화 이슈를 방지하려고 합니다. 데이터 형식(JSON, CSV 및 XML)을 인코딩하지 않는 형식의 경우 자동 로더는 XML 파일의 중첩 필드를 포함하여 모든 열을 문자열로 유추합니다. Apache Spark DataFrameReader
는 스키마 유추에 다른 동작을 사용하여 샘플 데이터를 기반으로 XML 원본의 열에 대한 데이터 형식을 선택합니다. 자동 로더에서 이 동작을 사용하도록 설정하려면 cloudFiles.inferColumnTypes
옵션을 true
로 설정합니다.
자동 로더는 데이터를 처리할 때 새 열 추가를 검색합니다. 자동 로더가 새 열을 검색하면 스트림이 .로더를 사용하여 UnknownFieldException
중지됩니다. 스트림에서 이 오류를 throw하기 전에 자동 로더는 데이터의 최신 마이크로 일괄 처리에 대한 스키마 유추를 수행하고, 스키마 끝에 새 열을 병합하여 스키마 위치를 최신 스키마로 업데이트합니다. 기존 열의 데이터 형식은 변경되지 않은 상태로 유지됩니다. 자동 로더는 스키마 진화를 위한 다양한 모드를 지원하며, 이 모드는 옵션cloudFiles.schemaEvolutionMode
에서 설정합니다.
스키마 힌트를 사용하여 유추된 스키마에서 알고 예상하는 스키마 정보를 적용할 수 있습니다. 열이 특정 데이터 형식임을 알고 있거나 보다 일반적인 데이터 형식(예: 정수 대신 double)을 선택하려는 경우 SQL 스키마 사양 구문을 사용하여 열 데이터 형식에 대한 임의의 힌트 수를 문자열로 제공할 수 있습니다. 구조된 데이터 열을 사용하도록 설정하면 스키마가 아닌 경우 명명된 필드가 열에 _rescued_data
로드됩니다. 옵션을 readerCaseSensitive
설정하여 이 동작을 false
변경할 수 있습니다. 이 경우 자동 로더는 대/소문자를 구분하지 않는 방식으로 데이터를 읽습니다.
예제
이 섹션의 예제에서는 Apache Spark GitHub 리포지토리에서 다운로드할 수 있는 XML 파일을 사용합니다.
XML 읽기 및 쓰기
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
데이터를 읽을 때 스키마를 수동으로 지정할 수 있습니다.
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
SQL API
XML 데이터 원본은 데이터 형식을 유추할 수 있습니다.
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
DDL에서 열 이름 및 형식을 지정할 수도 있습니다. 이 경우 스키마는 자동으로 유추되지 않습니다.
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
COPY INTO를 사용하여 XML 로드
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
행 유효성 검사를 사용하여 XML 읽기
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
중첩된 XML 구문 분석(from_xml 및 schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
SQL API를 사용하여 from_xml 및 schema_of_xml
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
자동 로더를 사용하여 XML 로드
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)