Lectura y escritura de archivos XML
Importante
Esta característica está en versión preliminar pública.
En este artículo se describe cómo leer y escribir archivos XML.
El lenguaje de marcado extensible (XML) es un lenguaje de marcado para dar formato, almacenar y compartir datos en formato de texto. Define un conjunto de reglas para serializar datos que van desde documentos hasta estructuras de datos arbitrarias.
La compatibilidad con el formato de archivo XML nativo permite la ingesta, la consulta y el análisis de datos XML para el procesamiento por lotes o el streaming. Puede deducir y evolucionar automáticamente los tipos de datos y esquemas, admite expresiones SQL como from_xml
, y puede generar documentos XML. No requiere archivos JAR externos y funciona sin problemas con el cargador automático read_files
y COPY INTO
. Opcionalmente, puede validar cada registro XML de nivel de fila con una definición de esquema XML (XSD).
Requisitos
Databricks Runtime 14.3 y versiones posteriores
Análisis de registros XML
La especificación XML exige una estructura bien formada. Sin embargo, esta especificación no se asigna inmediatamente a un formato tabular. Debe especificar la opción rowTag
para indicar el elemento XML que se asigna a DataFrame
Row
. El elemento rowTag
se convierte en el nivel superior struct
. Los elementos secundarios de rowTag
se convierten en los campos del nivel superior struct
.
Puede especificar el esquema de este registro o dejar que se infiera automáticamente. Dado que el analizador solo examina los elementos rowTag
, se filtran DTD y entidades externas.
En los ejemplos siguientes se muestra la inferencia de esquema y el análisis de un archivo XML mediante rowTag
diferentes opciones:
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
Lea el archivo XML con la rowTag
opción "books":
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
Salida:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
Lea el archivo XML con rowTag
como "libro":
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
Salida:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
Opciones de origen de datos
Las opciones del origen de datos para XML se pueden especificar de las maneras siguientes:
- Los
.option/.options
métodos de lo siguiente:- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- Las siguientes funciones integradas:
- Cláusula
OPTIONS
de CREATE TABLE USING DATA_SOURCE
Para obtener una lista de opciones, vea Opciones de cargador automático.
Compatibilidad con XSD
Opcionalmente, puede validar cada registro XML de nivel de fila mediante una definición de esquema XML (XSD). El archivo XSD se especifica en la opción rowValidationXSDPath
. El XSD no afecta de otro modo al esquema proporcionado o inferido. Un registro que produce un error en la validación se marca como "dañado" y se controla en función de la opción de modo de control de registros dañados descrita en la sección de opciones.
Puede usar XSDToSchema
para extraer un esquema de DataFrame de Spark de un archivo XSD. Solo admite tipos simples, complejos y de secuencia, y solo admite la funcionalidad básica XSD.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
En la tabla siguiente se muestra la conversión de tipos de datos XSD a tipos de datos de Spark:
Tipo de datos XSD | Tipos de datos de Spark |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short , unsignedByte |
ShortType |
integer , negativeInteger , nonNegativeInteger , nonPositiveInteger , positiveInteger , unsignedShort |
IntegerType |
long , unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
Análisis de XML anidado
Los datos XML de una columna con valores de cadena de una trama de datos existente se pueden analizar con schema_of_xml
y from_xml
que devuelven el esquema y los resultados analizados como nuevas columnas struct
. Los datos XML pasados como argumento a schema_of_xml
y from_xml
deben ser un único registro XML con formato correcto.
schema_of_xml
Sintaxis
schema_of_xml(xmlStr [, options] )
Argumentos
xmlStr
: expresión STRING que especifica un único registro XML con formato correcto.options
: un literalMAP<STRING,STRING>
opcional que especifica directivas.
Devuelve
Una CADENA que contiene una definición de una estructura con n campos de cadenas cuyos nombres de columna se derivan de los nombres de elementos y atributos XML. Los valores de los campos contienen los tipos SQL con formato derivados.
from_xml
Sintaxis
from_xml(xmlStr, schema [, options])
Argumentos
xmlStr
: expresión STRING que especifica un único registro XML con formato correcto.schema
: expresión STRING o invocación de la funciónschema_of_xml
.options
: un literalMAP<STRING,STRING>
opcional que especifica directivas.
Devuelve
Una estructura con nombres de campo y tipos que coinciden con la definición de esquema. El esquema debe definirse como pares de tipo de datos y nombre de columna separados por comas como se usa en, por ejemplo, CREATE TABLE
. La mayoría de las opciones que se muestran en las opciones del origen de datos son aplicables con las siguientes excepciones:
rowTag
: dado que solo hay un registro XML, larowTag
opción no es aplicable.mode
(valor predeterminadoPERMISSIVE
): permite un modo para controlar los registros dañados durante el análisis.PERMISSIVE
: cuando cumple un registro dañado, coloca la cadena con formato incorrecto en un campo configurado porcolumnNameOfCorruptRecord
y establece campos con formato incorrecto ennull
. Para mantener registros dañados, puede establecer un campo de tipo de cadena denominadocolumnNameOfCorruptRecord
en un esquema definido por el usuario. Si un esquema no tiene el campo, quita los registros dañados durante el análisis. Al deducir un esquema, agrega implícitamente un campocolumnNameOfCorruptRecord
en un esquema de salida.FAILFAST
: inicia una excepción cuando detecta registros dañados.
Conversión de estructura
Debido a las diferencias de estructura entre DataFrame y XML, hay algunas reglas de conversión de datos XML a DataFrame
y de DataFrame
a datos XML. Tenga en cuenta que el control de atributos se puede deshabilitar con la opción excludeAttribute
.
Conversión de XML a DataFrame
Atributos: los atributos se convierten como campos con el prefijo de encabezado attributePrefix
.
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
genera un esquema siguiente:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
Datos de caracteres de un elemento que contiene atributos o elementos secundarios: estos se analizan en el campo valueTag
. Si hay varias apariciones de datos de caracteres, el campo valueTag
se convierte en un tipo array
.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
genera un esquema siguiente:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
Conversión de DataFrame a XML
Elemento como una matriz en una matriz: escribir un archivo XML de DataFrame
tener un campo ArrayType
con su elemento como ArrayType
tendría un campo anidado adicional para el elemento. Esto no ocurriría al leer y escribir datos XML, sino al escribir una lectura DataFrame
de otros orígenes. Por lo tanto, el recorrido de ida y vuelta en la lectura y escritura de archivos XML tiene la misma estructura, pero escribir una lectura DataFrame
de otros orígenes es posible tener una estructura diferente.
DataFrame con un esquema siguiente:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
y con los datos siguientes:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
genera un archivo XML siguiente:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
El nombre del elemento de la matriz sin nombre en DataFrame
es especificado por la opción arrayElementName
(Valor predeterminado: item
).
Columna de datos rescatados
La columna de datos rescatados garantiza que nunca se pierdan datos durante la extracción, transformación y carga de datos (ETL). Puede habilitar la columna de datos rescatados para capturar cualquier dato que no se haya analizado debido a que uno o más campos de un registro presentan uno de los siguientes problemas:
- Ausente del esquema proporcionado
- No coincide con el tipo de datos del esquema proporcionado
- Tiene un error de coincidencia de mayúsculas y minúsculas con los nombres de campo del esquema proporcionado
La columna de datos rescatados se devuelve como un documento JSON que contiene las columnas que se rescataron y la ruta de acceso del archivo de origen del registro. Para quitar la ruta de acceso del archivo de origen de la columna de datos rescate, puede establecer la siguiente configuración de SQL:
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
Puede habilitar la columna de datos rescatados estableciendo la opción rescuedDataColumn
en un nombre de columna al leer los datos, como _rescued_data
con spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
.
El analizador XML admite tres modos al analizar registros: PERMISSIVE
, DROPMALFORMED
, y FAILFAST
. Cuando se usan junto con rescuedDataColumn
, las discrepancias de tipo de datos no hacen que los registros se eliminen en el modo DROPMALFORMED
ni que se genere un error en el modo FAILFAST
. Solo los registros dañados (XML incompletos o con formato incorrecto) se quitan o producen errores.
Inferencia y evolución de esquemas en el cargador automático
Para obtener una explicación detallada de este tema y las opciones aplicables, consulte Configuración de la inferencia de esquemas y la evolución en Auto Loader. Puede configurar Auto Loader para detectar automáticamente el esquema de datos XML cargados, lo que le permite inicializar tablas sin declarar explícitamente el esquema de datos y evolucionar el esquema de tabla a medida que se introducen nuevas columnas. Esto elimina la necesidad de realizar un seguimiento manual y aplicar los cambios de esquema a lo largo del tiempo.
De manera predeterminada, la inferencia de esquemas de Auto Loader busca evitar problemas de evolución del esquema debido a errores de coincidencia de tipos. Para los formatos que no codifican tipos de datos (JSON, CSV y XML), el cargador automático deduce todas las columnas como cadenas, incluidos los campos anidados en archivos XML. Apache Spark DataFrameReader
usa un comportamiento diferente para la inferencia de esquemas, seleccionando tipos de datos para columnas en orígenes XML en función de los datos de ejemplo. Para habilitar este comportamiento con Auto Loader, establezca la opción cloudFiles.inferColumnTypes
en true
.
El cargador automático detecta la adición de nuevas columnas a medida que procesa los datos. Cuando Auto Loader detecta una nueva columna, la secuencia se detiene con un UnknownFieldException
. Antes de que la secuencia genere este error, Auto Loader realiza la inferencia de esquemas en el microlote de datos más reciente y actualiza la ubicación del esquema con el esquema más reciente combinando las columnas nuevas al final del esquema. Los tipos de datos de las columnas existentes permanecen sin cambios. Auto Loader admite diferentes modos de para la evolución del esquema, que se establece en la opción cloudFiles.schemaEvolutionMode
.
Puede usar sugerencias de esquema para aplicar la información de esquema que conoce y espera en un esquema inferido. Cuando sabe que una columna es de un tipo de datos específico o si desea elegir un tipo de datos más general (por ejemplo, un doble en lugar de un entero), puede proporcionar un número arbitrario de sugerencias para los tipos de datos de columna como una cadena mediante la sintaxis de especificación del esquema SQL. Cuando la columna de datos rescatados está habilitada, los campos nombrados en un caso distinto al del esquema se cargan en la columna _rescued_data
. Para cambiar este comportamiento, establezca la opción readerCaseSensitive
en false
, en cuyo caso el cargador automático lee los datos de una manera que no distingue mayúsculas de minúsculas.
Ejemplos
En los ejemplos de esta sección se usa un archivo XML disponible para su descarga en el Repositorio de GitHub de Apache Spark.
Lectura y escritura de XML
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
Puede especificar manualmente el esquema al leer datos:
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
API DE SQL
El origen de datos XML puede deducir tipos de datos:
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
También puede especificar nombres y tipos de columna en DDL. En este caso, el esquema no se deduce automáticamente.
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
Cargar XML mediante COPY INTO
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
Leer XML con validación de filas
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
Analizar XML anidado (from_xml y schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
from_xml y schema_of_xml con SQL API
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
Carga de XML con cargador automático
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)