Lectura y escritura de datos XML mediante la biblioteca spark-xml
Importante
Esta documentación se ha retirado y es posible que no se actualice. Los productos, servicios o tecnologías mencionados en este contenido no están oficialmente aprobados ni probados por Databricks.
La compatibilidad con el formato de archivo XML nativo está disponible como versión preliminar privada. Consulte Lectura y escritura de archivos XML.
En este artículo se describe cómo leer y escribir un archivo XML como un origen de datos de Apache Spark.
Requisitos
Cree la biblioteca
spark-xml
como una biblioteca de Maven. Para la coordenada de Maven, especifique:- Databricks Runtime 7.x y versiones posteriores:
com.databricks:spark-xml_2.12:<release>
Vea
spark-xml
Versiones para obtener la versión más reciente de<release>
.- Databricks Runtime 7.x y versiones posteriores:
Instale la biblioteca en un clúster.
Ejemplo
En el ejemplo de esta sección se usa el archivo XML de libros.
Recupere el archivo XML de los libros:
$ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
Cargue el archivo a DBFS.
Lectura y escritura de datos XML
SQL
/*Infer schema*/
CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
/*Specify column names and types*/
CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
Scala
// Infer schema
import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method
val df = spark.read
.option("rowTag", "book")
.xml("dbfs:/books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
// Specify schema
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read
.option("rowTag", "book")
.schema(customSchema)
.xml("books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
R
# Infer schema
library(SparkR)
sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))
df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")
# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")
# Specify schema
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")
Opciones
- Lectura
path
: ubicación de los archivos XML. Acepta expresiones estándar de globbing de Hadoop.rowTag
: etiqueta de fila que se trata como una fila. Por ejemplo, en el XML<books><book><book>...</books>
, el valor seríabook
. El valor predeterminado esROW
.samplingRatio
: proporción de muestreo para deducir el esquema (0,0 ~ 1). El valor predeterminado es 1. Los tipos posibles sonStructType
,ArrayType
,StringType
,LongType
,DoubleType
,BooleanType
,TimestampType
yNullType
a menos que se proporcione un esquema.excludeAttribute
: si se excluyen los atributos de los elementos. El valor predeterminado es False.nullValue
: valor que se debe tratar como un valornull
. El valor predeterminado es""
.mode
: modo para tratar con registros dañados. El valor predeterminado esPERMISSIVE
.PERMISSIVE
:- Cuando encuentra un registro dañado, establece todos los campos en
null
y coloca la cadena con formato anterior en un nuevo campo configurado porcolumnNameOfCorruptRecord
. - Cuando encuentra un campo con el tipo de datos incorrecto, establece el campo infractor en
null
.
- Cuando encuentra un registro dañado, establece todos los campos en
DROPMALFORMED
: omite los registros dañados.FAILFAST
: inicia una excepción cuando detecta registros dañados.
inferSchema
: si estrue
, intenta deducir un tipo adecuado para cada columna DataFrame resultante, como un tipo booleano, numérico o de fecha. Si esfalse
, todas las columnas resultantes son de tipo cadena. El valor predeterminado estrue
.columnNameOfCorruptRecord
: el nombre del nuevo campo donde se almacenan las cadenas con formato de error. El valor predeterminado es_corrupt_record
.attributePrefix
: prefijo de atributos para diferenciar atributos y elementos. Este es el prefijo de los nombres de campo. El valor predeterminado es_
.valueTag
: etiqueta usada para el valor cuando hay atributos en un elemento que no tiene elementos secundarios. El valor predeterminado es_VALUE
.charset
: el valor predeterminado esUTF-8
, pero se puede establecer en otros nombres de juego de caracteres válidos.ignoreSurroundingSpaces
: si se deben omitir o no los espacios en blanco que rodean los valores. El valor predeterminado es False.rowValidationXSDPath
: ruta de acceso a un archivo XSD que se usa para validar el XML para cada fila. Las filas que no se validan se tratan como errores de análisis como se mencionó anteriormente. El XSD no afecta de otro modo al esquema proporcionado o inferido. Si la misma ruta de acceso local aún no está visible en los ejecutores del clúster, el XSD y cualquier otro del que dependa se deben agregar a los ejecutores de Spark con SparkContext.addFile. En este caso, para usar el XSD local/foo/bar.xsd
, llame aaddFile("/foo/bar.xsd")
y pase"bar.xsd"
comorowValidationXSDPath
.
- Escritura
path
: ubicación para escribir archivos.rowTag
: etiqueta de fila que se trata como una fila. Por ejemplo, en el XML<books><book><book>...</books>
, el valor seríabook
. El valor predeterminado esROW
.rootTag
: etiqueta raíz que se debe tratar como raíz. Por ejemplo, en el XML<books><book><book>...</books>
, el valor seríabooks
. El valor predeterminado esROWS
.nullValue
: valor para escribir el valornull
. El valor predeterminado es la cadena"null"
. Cuando es"null"
, no escribe atributos ni elementos para los campos.attributePrefix
: prefijo de atributos para diferenciar atributos y elementos. Este es el prefijo de los nombres de campo. El valor predeterminado es_
.valueTag
: etiqueta usada para el valor cuando hay atributos en un elemento que no tiene elementos secundarios. El valor predeterminado es_VALUE
.compression
: códec de compresión que se usará al guardar en el archivo. Debe ser el nombre completo de una clase que implementeorg.apache.hadoop.io.compress.CompressionCodec
o uno de los nombres cortos que no tienen en cuenta mayúsculas de minúsculas (bzip2
,gzip
,lz4
ysnappy
). El valor predeterminado es sin compresión.
Admite el uso abreviado de nombres; Puede usar xml
en lugar de com.databricks.spark.xml
.
Compatibilidad con XSD
Puede validar filas individuales con un esquema XSD mediante rowValidationXSDPath
.
Use la utilidad com.databricks.spark.xml.util.XSDToSchema
para extraer un esquema de DataFrame de Spark de algunos archivos XSD. Solo admite tipos simples, complejos y de secuencia, solo la funcionalidad XSD básica y es experimental.
import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths
val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)
Análisis de XML anidado
Aunque se usa principalmente para convertir un archivo XML en un DataFrame, también puede usar el método from_xml
para analizar XML en una columna con valores de cadena en un DataFrame existente y agregarlo como una nueva columna con resultados analizados como una estructura con:
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
Nota:
mode
:- Si se establece en
PERMISSIVE
, el valor predeterminado, el modo de análisis en su lugar tiene como valor predeterminadoDROPMALFORMED
. Si incluye una columna en el esquema parafrom_xml
que coincida concolumnNameOfCorruptRecord
, el modoPERMISSIVE
genera registros con formatos incorrectos en esa columna en la estructura resultante. - Si se establece en
DROPMALFORMED
, los valores XML que no se analizan correctamente tienen como resultado un valornull
para la columna. No se descarta ninguna fila.
- Si se establece en
from_xml
convierte matrices de cadenas que contienen XML en matrices de estructuras analizadas. En su lugar, useschema_of_xml_array
.from_xml_string
es una alternativa para su uso en UDF que funciona en una cadena directamente en lugar de en una columna.
Reglas de conversión
Debido a las diferencias estructurales entre DataFrames y XML, hay algunas reglas de conversión de datos XML a DataFrame y de DataFrame a datos XML. Puede deshabilitar el control de atributos con la opción excludeAttribute
.
Conversión de XML a DataFrame
Atributos: los atributos se convierten como campos con el prefijo especificado en la opción
attributePrefix
. SiattributePrefix
es_
, el documento<one myOneAttrib="AAAA"> <two>two</two> <three>three</three> </one>
genera el esquema:
root |-- _myOneAttrib: string (nullable = true) |-- two: string (nullable = true) |-- three: string (nullable = true)
Si un elemento tiene atributos pero no elementos secundarios, el valor del atributo se coloca en un campo independiente especificado en la opción
valueTag
. SivalueTag
es_VALUE
, el documento<one> <two myTwoAttrib="BBBBB">two</two> <three>three</three> </one>
genera el esquema:
root |-- two: struct (nullable = true) | |-- _VALUE: string (nullable = true) | |-- _myTwoAttrib: string (nullable = true) |-- three: string (nullable = true)
Convertir DataFrame en XML
Escribir un archivo XML de DataFrame que tenga un campo ArrayType
con su elemento como ArrayType
tendría un campo anidado adicional para el elemento. Esto no sucedería al leer y escribir datos XML, sino al escribir un DataFrame leído desde otros orígenes. Por lo tanto, el recorrido de ida y vuelta en la lectura y escritura de archivos XML tiene la misma estructura, pero escribir un DataFrame leído desde otros orígenes puede tener una estructura diferente.
DataFrame con el esquema:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
y datos:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
genera el archivo XML:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>