Leggere e scrivere file XML
Importante
Questa funzionalità è disponibile in anteprima pubblica.
Questo articolo descrive come leggere e scrivere file XML.
Extensible Markup Language (XML) è un linguaggio di markup per la formattazione, l'archiviazione e la condivisione dei dati in formato testuale. Definisce un set di regole per la serializzazione dei dati, da documenti a strutture di dati arbitrarie.
Il supporto del formato di file XML nativo consente l'inserimento, l'esecuzione di query e l'analisi dei dati XML per l'elaborazione batch o lo streaming. Può dedurre ed evolvere automaticamente schemi e tipi di dati, supporta espressioni SQL come from_xml
e può generare documenti XML. Non richiede file JAR esterni e funziona perfettamente con il caricatore read_files
automatico e COPY INTO
. Facoltativamente, è possibile convalidare ogni record XML a livello di riga rispetto a una definizione di XML Schema (XSD).
Requisiti
Databricks Runtime 14.3 e versioni successive
Analizzare i record XML
La specifica XML impone una struttura ben formata. Tuttavia, questa specifica non esegue immediatamente il mapping a un formato tabulare. È necessario specificare l'opzione rowTag
per indicare l'elemento XML mappato a un oggetto DataFrame
Row
. L'elemento rowTag
diventa l'elemento di primo livello struct
. Gli elementi figlio di rowTag
diventano i campi del livello struct
superiore.
È possibile specificare lo schema per questo record o lasciarlo dedurre automaticamente. Poiché il parser esamina solo gli rowTag
elementi, le entità DTD ed esterne vengono filtrate.
Gli esempi seguenti illustrano l'inferenza dello schema e l'analisi di un file XML usando diverse opzioni di rowTag
:
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
Leggere il file XML con rowTag
l'opzione "books":
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
Output:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
Leggere il file XML con rowTag
come "book":
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
Output:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
Opzioni origine dati
È possibile specificare le opzioni dell'origine dati per XML nei modi seguenti:
- Metodi
.option/.options
dei seguenti:- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- Le funzioni predefinite seguenti:
- Clausola
OPTIONS
di CREATE TABLE UTILIZZANDO DATA_SOURCE
Per un elenco delle opzioni, vedere opzioni del caricatore automatico.
Supporto XSD
Facoltativamente, è possibile convalidare ogni record XML a livello di riga da una definizione di XML Schema (XSD). Il file XSD viene specificato nell'opzione rowValidationXSDPath
. L'XSD non influisce in caso contrario sullo schema fornito o dedotto. Un record che non riesce la convalida viene contrassegnato come "danneggiato" e gestito in base all'opzione modalità di gestione dei record danneggiata descritta nella sezione dell'opzione.
È possibile usare XSDToSchema
per estrarre uno schema del dataframe Spark da un file XSD. Supporta solo tipi di sequenza semplici, complessi e supporta solo le funzionalità XSD di base.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
La tabella seguente illustra la conversione dei tipi di dati XSD in tipi di dati Spark:
Tipi di dati XSD | Tipi di dati Spark |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short , unsignedByte |
ShortType |
integer , negativeInteger , nonNegativeInteger , nonPositiveInteger , positiveInteger unsignedShort |
IntegerType |
long , unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
Analizzare xml annidato
I dati XML in una colonna con valori di stringa in un dataframe esistente possono essere analizzati con schema_of_xml
e from_xml
che restituisce lo schema e i risultati analizzati come nuove colonne struct
. I dati XML passati come argomento a schema_of_xml
e from_xml
devono essere un singolo record XML ben formato.
schema_of_xml
Sintassi
schema_of_xml(xmlStr [, options] )
Argomenti
-
xmlStr
: espressione STRING che specifica un singolo record XML ben formato. -
options
: valore letterale facoltativoMAP<STRING,STRING>
che specifica le direttive.
Resi
Stringa che contiene una definizione di una struttura dati con n campi di stringhe, in cui i nomi delle colonne sono derivati dai nomi degli elementi e degli attributi XML. I valori dei campi contengono i tipi SQL formattati derivati.
from_xml
Sintassi
from_xml(xmlStr, schema [, options])
Argomenti
-
xmlStr
: espressione STRING che specifica un singolo record XML ben formato. -
schema
: espressione STRING o chiamata dellaschema_of_xml
funzione. -
options
: valore letterale facoltativoMAP<STRING,STRING>
che specifica le direttive.
Resi
Una struttura con nomi di campo e tipi corrispondenti alla definizione dello schema. Lo schema deve essere definito come coppie di nomi di colonna e tipi di dati separate da virgole come utilizzato, ad esempio, in CREATE TABLE
. La maggior parte delle opzioni visualizzate nelle opzioni dell'origine dati è applicabile con le eccezioni seguenti:
-
rowTag
: poiché è presente un solo record XML, l'opzionerowTag
non è applicabile. -
mode
(impostazione predefinita:PERMISSIVE
): consente una modalità per gestire i record danneggiati durante l'analisi.-
PERMISSIVE
: quando soddisfa un record danneggiato, inserisce la stringa in formato non valido in un campo configurato dacolumnNameOfCorruptRecord
e imposta i campi in formato non valido sunull
. Per mantenere i record danneggiati, è possibile impostare un campo di tipo stringa denominatocolumnNameOfCorruptRecord
in uno schema definito dall'utente. Se uno schema non dispone del campo, elimina i record danneggiati durante l'analisi. Quando si deduce uno schema, aggiunge in modo implicito un campocolumnNameOfCorruptRecord
in uno schema di output. -
FAILFAST
: genera un'eccezione quando soddisfa i record danneggiati.
-
Conversione della struttura
A causa delle differenze di struttura tra DataFrame e XML, esistono alcune regole di conversione da dati XML a DataFrame
e da DataFrame
a dati XML. Si noti che la gestione degli attributi può essere disabilitata con l'opzione excludeAttribute
.
Conversione da XML a dataframe
Attributi: gli attributi vengono convertiti come campi con il prefisso attributePrefix
dell'intestazione .
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
produce uno schema seguente:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
Dati di tipo carattere in un elemento contenente attributi o elementi figlio: vengono analizzati nel valueTag
campo. Se sono presenti più occorrenze di dati di tipo carattere, il valueTag
campo viene convertito in un array
tipo.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
produce uno schema seguente:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
Conversione da dataframe a XML
Elemento come matrice in una matrice: scrittura di un file XML da DataFrame
con un campo ArrayType
con il relativo elemento come ArrayType
avrebbe un campo annidato aggiuntivo per l'elemento. Ciò non avviene durante la lettura e la scrittura di dati XML, ma la scrittura di una DataFrame
lettura da altre origini. Pertanto, il round trip nella lettura e scrittura di file XML ha la stessa struttura, ma la scrittura di una DataFrame
lettura da altre origini è possibile avere una struttura diversa.
DataFrame con uno schema seguente:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
e con i dati seguenti:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
produce un file XML seguente:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
Il nome dell'elemento della matrice senza nome in DataFrame
è specificato dall'opzione arrayElementName
(Default: item
).
Colonna di dati salvata
La colonna di dati recuperati garantisce che i dati non vadano persi durante l'ETL. È possibile abilitare la colonna di dati salvata per acquisire tutti i dati che non sono stati analizzati perché uno o più campi in un record presentano uno dei problemi seguenti:
- Assente dallo schema fornito
- Non corrisponde al tipo di dati dello schema fornito
- C'è una mancata corrispondenza di maiuscole e minuscole nei nomi dei campi del schema fornito.
La colonna di dati salvata viene restituita come documento JSON contenente le colonne salvate e il percorso del file di origine del record. Per rimuovere il percorso del file di origine dalla colonna di dati salvata, è possibile impostare la configurazione SQL seguente:
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
È possibile abilitare la colonna di dati recuperata impostando l'opzione rescuedDataColumn
su un nome di colonna durante la lettura dei dati, ad esempio _rescued_data
con spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
.
Il parser XML supporta tre modalità durante l'analisi dei record: PERMISSIVE
, DROPMALFORMED
e FAILFAST
. Se usato insieme a rescuedDataColumn
, le mancate corrispondenze del tipo di dati non causano l'esclusione dei record in modalità DROPMALFORMED
oppure generano un errore in modalità FAILFAST
. Solo i record danneggiati (XML incompleto o non valido) vengono eliminati o generati errori.
Inferenza ed evoluzione dello schema nel caricatore automatico
Per una descrizione dettagliata di questo argomento e delle opzioni applicabili, vedere Configurare l'inferenza e l'evoluzione dello schema in Auto Loader. È possibile configurare il caricatore automatico per rilevare automaticamente lo schema dei dati XML caricati, consentendo di inizializzare le tabelle senza dichiarare esplicitamente lo schema dei dati ed evolvere lo schema della tabella man mano che vengono introdotte nuove colonne. In questo modo si elimina la necessità di tenere traccia e applicare manualmente le modifiche dello schema nel tempo.
Per impostazione predefinita, l'inferenza dello schema di Auto Loader cerca di evitare problemi di evoluzione dello schema a causa di mancata corrispondenza dei tipi. Per i formati che non codificano i tipi di dati (JSON, CSV e XML), Il caricatore automatico deduce tutte le colonne come stringhe, inclusi i campi annidati nei file XML. Apache Spark DataFrameReader
usa un comportamento differente per l'inferenza dello schema, selezionando i tipi di dati per le colonne delle origini XML in base ai dati di esempio. Per abilitare questo comportamento con il caricatore automatico, impostare l'opzione cloudFiles.inferColumnTypes
su true
.
Il caricatore automatico rileva l'aggiunta di nuove colonne durante l'elaborazione dei dati. Quando il caricatore automatico rileva una nuova colonna, il flusso si arresta con un UnknownFieldException
. Prima che il flusso generi questo errore, Auto Loader esegue l'inferenza dello schema sull'ultimo micro-batch di dati e aggiorna il percorso dello schema con lo schema più recente unendo nuove colonne alla fine dello schema. I tipi di dati delle colonne esistenti rimangono invariati. Il caricatore automatico supporta diverse modalità per gestire l'evoluzione dello schema, che imposti nell'opzione cloudFiles.schemaEvolutionMode
.
È possibile usare hint dello schema per applicare le informazioni sullo schema note e previste in uno schema dedotto. Quando si sa che una colonna è di un tipo di dati specifico o se si desidera scegliere un tipo di dati più generale, ad esempio un valore double anziché un numero intero, è possibile fornire un numero arbitrario di hint per i tipi di dati di colonna come stringa usando la sintassi della specifica dello schema SQL. Quando la colonna di dati salvata è abilitata, i campi denominati in un caso diverso da quello dello schema vengono caricati nella colonna _rescued_data
. È possibile modificare questo comportamento impostando l'opzione readerCaseSensitive
su false
, nel qual caso il caricatore automatico legge i dati in modo senza distinzione tra maiuscole e minuscole.
Esempi
Gli esempi in questa sezione usano un file XML disponibile per il download nel repository GitHub di Apache Spark.
Leggere e scrivere codice XML
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
È possibile specificare manualmente lo schema durante la lettura dei dati:
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
API SQL
L'origine dati XML può dedurre i tipi di dati:
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
È anche possibile specificare nomi e tipi di colonna in DDL. In questo caso, lo schema non viene dedotto automaticamente.
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
Carica XML utilizzando COPY INTO
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
Leggere il codice XML con la convalida delle righe
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
Analizzare xml annidato (from_xml e schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
from_xml e schema_of_xml con l'API SQL
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
Caricare codice XML con caricatore automatico
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)