Compartir a través de


Lectura y escritura de datos XML mediante la biblioteca spark-xml

Importante

Esta documentación se ha retirado y es posible que no se actualice. Los productos, servicios o tecnologías mencionados en este contenido no están oficialmente aprobados ni probados por Databricks.

La compatibilidad con el formato de archivo XML nativo está disponible como versión preliminar privada. Consulte Lectura y escritura de archivos XML.

En este artículo se describe cómo leer y escribir un archivo XML como un origen de datos de Apache Spark.

Requisitos

  1. Cree la biblioteca spark-xml como una biblioteca de Maven. Para la coordenada de Maven, especifique:

    • Databricks Runtime 7.x y versiones posteriores: com.databricks:spark-xml_2.12:<release>

    Vea spark-xmlVersiones para obtener la versión más reciente de <release>.

  2. Instale la biblioteca en un clúster.

Ejemplo

En el ejemplo de esta sección se usa el archivo XML de libros.

  1. Recupere el archivo XML de los libros:

    $ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
    
  2. Cargue el archivo a DBFS.

Lectura y escritura de datos XML

SQL

/*Infer schema*/

CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

/*Specify column names and types*/

CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

Scala

// Infer schema

import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method

val df = spark.read
  .option("rowTag", "book")
  .xml("dbfs:/books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

// Specify schema

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))

val df = spark.read
  .option("rowTag", "book")
  .schema(customSchema)
  .xml("books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

R

# Infer schema

library(SparkR)

sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))

df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")

# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")

# Specify schema

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")

# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")

Opciones

  • Lectura
    • path: ubicación de los archivos XML. Acepta expresiones estándar de globbing de Hadoop.
    • rowTag: etiqueta de fila que se trata como una fila. Por ejemplo, en el XML <books><book><book>...</books>, el valor sería book. El valor predeterminado es ROW.
    • samplingRatio: proporción de muestreo para deducir el esquema (0,0 ~ 1). El valor predeterminado es 1. Los tipos posibles son StructType, ArrayType, StringType, LongType, DoubleType, BooleanType, TimestampType y NullType a menos que se proporcione un esquema.
    • excludeAttribute: si se excluyen los atributos de los elementos. El valor predeterminado es False.
    • nullValue: valor que se debe tratar como un valor null. El valor predeterminado es "".
    • mode: modo para tratar con registros dañados. El valor predeterminado es PERMISSIVE.
      • PERMISSIVE:
        • Cuando encuentra un registro dañado, establece todos los campos en null y coloca la cadena con formato anterior en un nuevo campo configurado por columnNameOfCorruptRecord.
        • Cuando encuentra un campo con el tipo de datos incorrecto, establece el campo infractor en null.
      • DROPMALFORMED: omite los registros dañados.
      • FAILFAST: inicia una excepción cuando detecta registros dañados.
    • inferSchema: si es true, intenta deducir un tipo adecuado para cada columna DataFrame resultante, como un tipo booleano, numérico o de fecha. Si es false, todas las columnas resultantes son de tipo cadena. El valor predeterminado es true.
    • columnNameOfCorruptRecord: el nombre del nuevo campo donde se almacenan las cadenas con formato de error. El valor predeterminado es _corrupt_record.
    • attributePrefix: prefijo de atributos para diferenciar atributos y elementos. Este es el prefijo de los nombres de campo. El valor predeterminado es _.
    • valueTag: etiqueta usada para el valor cuando hay atributos en un elemento que no tiene elementos secundarios. El valor predeterminado es _VALUE.
    • charset: el valor predeterminado es UTF-8, pero se puede establecer en otros nombres de juego de caracteres válidos.
    • ignoreSurroundingSpaces: si se deben omitir o no los espacios en blanco que rodean los valores. El valor predeterminado es False.
    • rowValidationXSDPath: ruta de acceso a un archivo XSD que se usa para validar el XML para cada fila. Las filas que no se validan se tratan como errores de análisis como se mencionó anteriormente. El XSD no afecta de otro modo al esquema proporcionado o inferido. Si la misma ruta de acceso local aún no está visible en los ejecutores del clúster, el XSD y cualquier otro del que dependa se deben agregar a los ejecutores de Spark con SparkContext.addFile. En este caso, para usar el XSD local /foo/bar.xsd, llame a addFile("/foo/bar.xsd") y pase "bar.xsd" como rowValidationXSDPath.
  • Escritura
    • path: ubicación para escribir archivos.
    • rowTag: etiqueta de fila que se trata como una fila. Por ejemplo, en el XML <books><book><book>...</books>, el valor sería book. El valor predeterminado es ROW.
    • rootTag: etiqueta raíz que se debe tratar como raíz. Por ejemplo, en el XML <books><book><book>...</books>, el valor sería books. El valor predeterminado es ROWS.
    • nullValue: valor para escribir el valor null. El valor predeterminado es la cadena "null". Cuando es "null", no escribe atributos ni elementos para los campos.
    • attributePrefix: prefijo de atributos para diferenciar atributos y elementos. Este es el prefijo de los nombres de campo. El valor predeterminado es _.
    • valueTag: etiqueta usada para el valor cuando hay atributos en un elemento que no tiene elementos secundarios. El valor predeterminado es _VALUE.
    • compression: códec de compresión que se usará al guardar en el archivo. Debe ser el nombre completo de una clase que implemente org.apache.hadoop.io.compress.CompressionCodec o uno de los nombres cortos que no tienen en cuenta mayúsculas de minúsculas (bzip2, gzip, lz4y snappy). El valor predeterminado es sin compresión.

Admite el uso abreviado de nombres; Puede usar xml en lugar de com.databricks.spark.xml.

Compatibilidad con XSD

Puede validar filas individuales con un esquema XSD mediante rowValidationXSDPath.

Use la utilidad com.databricks.spark.xml.util.XSDToSchema para extraer un esquema de DataFrame de Spark de algunos archivos XSD. Solo admite tipos simples, complejos y de secuencia, solo la funcionalidad XSD básica y es experimental.

import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths

val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)

Análisis de XML anidado

Aunque se usa principalmente para convertir un archivo XML en un DataFrame, también puede usar el método from_xml para analizar XML en una columna con valores de cadena en un DataFrame existente y agregarlo como una nueva columna con resultados analizados como una estructura con:

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._

val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))

Nota:

  • mode:
    • Si se establece en PERMISSIVE, el valor predeterminado, el modo de análisis en su lugar tiene como valor predeterminado DROPMALFORMED. Si incluye una columna en el esquema para from_xml que coincida con columnNameOfCorruptRecord, el modo PERMISSIVE genera registros con formatos incorrectos en esa columna en la estructura resultante.
    • Si se establece en DROPMALFORMED, los valores XML que no se analizan correctamente tienen como resultado un valor null para la columna. No se descarta ninguna fila.
  • from_xml convierte matrices de cadenas que contienen XML en matrices de estructuras analizadas. En su lugar, use schema_of_xml_array.
  • from_xml_string es una alternativa para su uso en UDF que funciona en una cadena directamente en lugar de en una columna.

Reglas de conversión

Debido a las diferencias estructurales entre DataFrames y XML, hay algunas reglas de conversión de datos XML a DataFrame y de DataFrame a datos XML. Puede deshabilitar el control de atributos con la opción excludeAttribute.

Conversión de XML a DataFrame

  • Atributos: los atributos se convierten como campos con el prefijo especificado en la opción attributePrefix. Si attributePrefix es _, el documento

    <one myOneAttrib="AAAA">
        <two>two</two>
        <three>three</three>
    </one>
    

    genera el esquema:

    root
    |-- _myOneAttrib: string (nullable = true)
    |-- two: string (nullable = true)
    |-- three: string (nullable = true)
    
  • Si un elemento tiene atributos pero no elementos secundarios, el valor del atributo se coloca en un campo independiente especificado en la opción valueTag. Si valueTag es _VALUE, el documento

    <one>
        <two myTwoAttrib="BBBBB">two</two>
        <three>three</three>
    </one>
    

    genera el esquema:

    root
    |-- two: struct (nullable = true)
    |    |-- _VALUE: string (nullable = true)
    |    |-- _myTwoAttrib: string (nullable = true)
    |-- three: string (nullable = true)
    

Convertir DataFrame en XML

Escribir un archivo XML de DataFrame que tenga un campo ArrayType con su elemento como ArrayType tendría un campo anidado adicional para el elemento. Esto no sucedería al leer y escribir datos XML, sino al escribir un DataFrame leído desde otros orígenes. Por lo tanto, el recorrido de ida y vuelta en la lectura y escritura de archivos XML tiene la misma estructura, pero escribir un DataFrame leído desde otros orígenes puede tener una estructura diferente.

DataFrame con el esquema:

 |-- a: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

y datos:

+------------------------------------+
|                                   a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

genera el archivo XML:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>