Compartir vía


Lectura y escritura de archivos XML

Importante

Esta característica está en versión preliminar pública.

En este artículo se describe cómo leer y escribir archivos XML.

El lenguaje de marcado extensible (XML) es un lenguaje de marcado para dar formato, almacenar y compartir datos en formato de texto. Define un conjunto de reglas para serializar datos que van desde documentos hasta estructuras de datos arbitrarias.

La compatibilidad con el formato de archivo XML nativo permite la ingesta, la consulta y el análisis de datos XML para el procesamiento por lotes o el streaming. Puede deducir y evolucionar automáticamente los tipos de datos y esquemas, admite expresiones SQL como from_xml, y puede generar documentos XML. No requiere archivos JAR externos y funciona sin problemas con el cargador automático read_files y COPY INTO. Opcionalmente, puede validar cada registro XML de nivel de fila con una definición de esquema XML (XSD).

Requisitos

Databricks Runtime 14.3 y versiones posteriores

Análisis de registros XML

La especificación XML exige una estructura bien formada. Sin embargo, esta especificación no se asigna inmediatamente a un formato tabular. Debe especificar la opción rowTag para indicar el elemento XML que se asigna a DataFrameRow. El elemento rowTag se convierte en el nivel superior struct. Los elementos secundarios de rowTag se convierten en los campos del nivel superior struct.

Puede especificar el esquema de este registro o dejar que se infiera automáticamente. Dado que el analizador solo examina los elementos rowTag, se filtran DTD y entidades externas.

En los ejemplos siguientes se muestra la inferencia de esquema y el análisis de un archivo XML mediante rowTag diferentes opciones:

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

Lea el archivo XML con la rowTag opción "books":

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

Salida:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

Lea el archivo XML con rowTag como "libro":

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

Salida:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

Opciones de origen de datos

Las opciones del origen de datos para XML se pueden especificar de las maneras siguientes:

Para obtener una lista de opciones, vea Opciones de cargador automático.

Compatibilidad con XSD

Opcionalmente, puede validar cada registro XML de nivel de fila mediante una definición de esquema XML (XSD). El archivo XSD se especifica en la opción rowValidationXSDPath. El XSD no afecta de otro modo al esquema proporcionado o inferido. Un registro que produce un error en la validación se marca como "dañado" y se controla en función de la opción de modo de control de registros dañados descrita en la sección de opciones.

Puede usar XSDToSchema para extraer un esquema de DataFrame de Spark de un archivo XSD. Solo admite tipos simples, complejos y de secuencia, y solo admite la funcionalidad básica XSD.

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

En la tabla siguiente se muestra la conversión de tipos de datos XSD a tipos de datos de Spark:

Tipo de datos XSD Tipos de datos de Spark
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer, negativeInteger, nonNegativeInteger, nonPositiveInteger, positiveInteger, unsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

Análisis de XML anidado

Los datos XML de una columna con valores de cadena de una trama de datos existente se pueden analizar con schema_of_xml y from_xml que devuelven el esquema y los resultados analizados como nuevas columnas struct. Los datos XML pasados como argumento a schema_of_xml y from_xml deben ser un único registro XML con formato correcto.

schema_of_xml

Sintaxis

schema_of_xml(xmlStr [, options] )

Argumentos

  • xmlStr: expresión STRING que especifica un único registro XML con formato correcto.
  • options: un literal MAP<STRING,STRING> opcional que especifica directivas.

Devuelve

Una CADENA que contiene una definición de una estructura con n campos de cadenas cuyos nombres de columna se derivan de los nombres de elementos y atributos XML. Los valores de los campos contienen los tipos SQL con formato derivados.

from_xml

Sintaxis

from_xml(xmlStr, schema [, options])

Argumentos

  • xmlStr: expresión STRING que especifica un único registro XML con formato correcto.
  • schema: expresión STRING o invocación de la función schema_of_xml.
  • options: un literal MAP<STRING,STRING> opcional que especifica directivas.

Devuelve

Una estructura con nombres de campo y tipos que coinciden con la definición de esquema. El esquema debe definirse como pares de tipo de datos y nombre de columna separados por comas como se usa en, por ejemplo, CREATE TABLE. La mayoría de las opciones que se muestran en las opciones del origen de datos son aplicables con las siguientes excepciones:

  • rowTag: dado que solo hay un registro XML, la rowTag opción no es aplicable.
  • mode (valor predeterminado PERMISSIVE): permite un modo para controlar los registros dañados durante el análisis.
    • PERMISSIVE: cuando cumple un registro dañado, coloca la cadena con formato incorrecto en un campo configurado por columnNameOfCorruptRecord y establece campos con formato incorrecto en null. Para mantener registros dañados, puede establecer un campo de tipo de cadena denominado columnNameOfCorruptRecord en un esquema definido por el usuario. Si un esquema no tiene el campo, quita los registros dañados durante el análisis. Al deducir un esquema, agrega implícitamente un campo columnNameOfCorruptRecord en un esquema de salida.
    • FAILFAST: inicia una excepción cuando detecta registros dañados.

Conversión de estructura

Debido a las diferencias de estructura entre DataFrame y XML, hay algunas reglas de conversión de datos XML a DataFrame y de DataFrame a datos XML. Tenga en cuenta que el control de atributos se puede deshabilitar con la opción excludeAttribute.

Conversión de XML a DataFrame

Atributos: los atributos se convierten como campos con el prefijo de encabezado attributePrefix.

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

genera un esquema siguiente:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

Datos de caracteres de un elemento que contiene atributos o elementos secundarios: estos se analizan en el campo valueTag. Si hay varias apariciones de datos de caracteres, el campo valueTag se convierte en un tipo array.

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

genera un esquema siguiente:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

Conversión de DataFrame a XML

Elemento como una matriz en una matriz: escribir un archivo XML de DataFrame tener un campo ArrayType con su elemento como ArrayType tendría un campo anidado adicional para el elemento. Esto no ocurriría al leer y escribir datos XML, sino al escribir una lectura DataFrame de otros orígenes. Por lo tanto, el recorrido de ida y vuelta en la lectura y escritura de archivos XML tiene la misma estructura, pero escribir una lectura DataFrame de otros orígenes es posible tener una estructura diferente.

DataFrame con un esquema siguiente:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

y con los datos siguientes:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

genera un archivo XML siguiente:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

El nombre del elemento de la matriz sin nombre en DataFrame es especificado por la opción arrayElementName (Valor predeterminado: item).

Columna de datos rescatados

La columna de datos rescatados garantiza que nunca se pierdan datos durante la extracción, transformación y carga de datos (ETL). Puede habilitar la columna de datos rescatados para capturar cualquier dato que no se haya analizado debido a que uno o más campos de un registro presentan uno de los siguientes problemas:

  • Ausente del esquema proporcionado
  • No coincide con el tipo de datos del esquema proporcionado
  • Tiene un error de coincidencia de mayúsculas y minúsculas con los nombres de campo del esquema proporcionado

La columna de datos rescatados se devuelve como un documento JSON que contiene las columnas que se rescataron y la ruta de acceso del archivo de origen del registro. Para quitar la ruta de acceso del archivo de origen de la columna de datos rescate, puede establecer la siguiente configuración de SQL:

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

Puede habilitar la columna de datos rescatados estableciendo la opción rescuedDataColumn en un nombre de columna al leer los datos, como _rescued_data con spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>).

El analizador XML admite tres modos al analizar registros: PERMISSIVE, DROPMALFORMED, y FAILFAST. Cuando se usan junto con rescuedDataColumn, las discrepancias de tipo de datos no hacen que los registros se eliminen en el modo DROPMALFORMED ni que se genere un error en el modo FAILFAST. Solo los registros dañados (XML incompletos o con formato incorrecto) se quitan o producen errores.

Inferencia y evolución de esquemas en el cargador automático

Para obtener una explicación detallada de este tema y las opciones aplicables, consulte Configuración de la inferencia de esquemas y la evolución en Auto Loader. Puede configurar Auto Loader para detectar automáticamente el esquema de datos XML cargados, lo que le permite inicializar tablas sin declarar explícitamente el esquema de datos y evolucionar el esquema de tabla a medida que se introducen nuevas columnas. Esto elimina la necesidad de realizar un seguimiento manual y aplicar los cambios de esquema a lo largo del tiempo.

De manera predeterminada, la inferencia de esquemas de Auto Loader busca evitar problemas de evolución del esquema debido a errores de coincidencia de tipos. Para los formatos que no codifican tipos de datos (JSON, CSV y XML), el cargador automático deduce todas las columnas como cadenas, incluidos los campos anidados en archivos XML. Apache Spark DataFrameReader usa un comportamiento diferente para la inferencia de esquemas, seleccionando tipos de datos para columnas en orígenes XML en función de los datos de ejemplo. Para habilitar este comportamiento con Auto Loader, establezca la opción cloudFiles.inferColumnTypes en true.

El cargador automático detecta la adición de nuevas columnas a medida que procesa los datos. Cuando Auto Loader detecta una nueva columna, la secuencia se detiene con un UnknownFieldException. Antes de que la secuencia genere este error, Auto Loader realiza la inferencia de esquemas en el microlote de datos más reciente y actualiza la ubicación del esquema con el esquema más reciente combinando las columnas nuevas al final del esquema. Los tipos de datos de las columnas existentes permanecen sin cambios. Auto Loader admite diferentes modos de para la evolución del esquema, que se establece en la opción cloudFiles.schemaEvolutionMode.

Puede usar sugerencias de esquema para aplicar la información de esquema que conoce y espera en un esquema inferido. Cuando sabe que una columna es de un tipo de datos específico o si desea elegir un tipo de datos más general (por ejemplo, un doble en lugar de un entero), puede proporcionar un número arbitrario de sugerencias para los tipos de datos de columna como una cadena mediante la sintaxis de especificación del esquema SQL. Cuando la columna de datos rescatados está habilitada, los campos nombrados en un caso distinto al del esquema se cargan en la columna _rescued_data. Para cambiar este comportamiento, establezca la opción readerCaseSensitive en false, en cuyo caso el cargador automático lee los datos de una manera que no distingue mayúsculas de minúsculas.

Ejemplos

En los ejemplos de esta sección se usa un archivo XML disponible para su descarga en el Repositorio de GitHub de Apache Spark.

Lectura y escritura de XML

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

Puede especificar manualmente el esquema al leer datos:

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

API DE SQL

El origen de datos XML puede deducir tipos de datos:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

También puede especificar nombres y tipos de columna en DDL. En este caso, el esquema no se deduce automáticamente.

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

Carga de XML mediante COPY INTO

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

Leer XML con validación de filas

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

Analizar XML anidado (from_xml y schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

from_xml y schema_of_xml con SQL API

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

Carga de XML con cargador automático

Python

query = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", True)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Scala

val query = spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow()
  .toTable("table_name")
  )

Recursos adicionales

Leer y escribir datos XML mediante la biblioteca spark-xml