Condividi tramite


Leggere e scrivere file XML

Importante

Questa funzionalità è disponibile in anteprima pubblica.

Questo articolo descrive come leggere e scrivere file XML.

Extensible Markup Language (XML) è un linguaggio di markup per la formattazione, l'archiviazione e la condivisione dei dati in formato testuale. Definisce un set di regole per la serializzazione dei dati, da documenti a strutture di dati arbitrarie.

Il supporto del formato di file XML nativo consente l'inserimento, l'esecuzione di query e l'analisi dei dati XML per l'elaborazione batch o lo streaming. Può dedurre ed evolvere automaticamente schemi e tipi di dati, supporta espressioni SQL come from_xmle può generare documenti XML. Non richiede file JAR esterni e funziona perfettamente con il caricatore read_files automatico e COPY INTO. Facoltativamente, è possibile convalidare ogni record XML a livello di riga rispetto a una definizione di XML Schema (XSD).

Requisiti

Databricks Runtime 14.3 e versioni successive

Analizzare i record XML

La specifica XML impone una struttura ben formata. Tuttavia, questa specifica non esegue immediatamente il mapping a un formato tabulare. È necessario specificare l'opzione rowTag per indicare l'elemento XML mappato a un oggetto DataFrameRow. L'elemento rowTag diventa l'elemento di primo livello struct. Gli elementi figlio di rowTag diventano i campi del livello structsuperiore.

È possibile specificare lo schema per questo record o lasciarlo dedurre automaticamente. Poiché il parser esamina solo gli rowTag elementi, le entità DTD ed esterne vengono filtrate.

Gli esempi seguenti illustrano l'inferenza dello schema e l'analisi di un file XML usando diverse opzioni di rowTag:

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

Leggere il file XML con rowTag l'opzione "books":

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

Output:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

Leggere il file XML con rowTag come "book":

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

Output:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

Opzioni origine dati

È possibile specificare le opzioni dell'origine dati per XML nei modi seguenti:

Per un elenco delle opzioni, vedere opzioni del caricatore automatico.

Supporto XSD

Facoltativamente, è possibile convalidare ogni record XML a livello di riga da una definizione di XML Schema (XSD). Il file XSD viene specificato nell'opzione rowValidationXSDPath . L'XSD non influisce in caso contrario sullo schema fornito o dedotto. Un record che non riesce la convalida viene contrassegnato come "danneggiato" e gestito in base all'opzione modalità di gestione dei record danneggiata descritta nella sezione dell'opzione.

È possibile usare XSDToSchema per estrarre uno schema del dataframe Spark da un file XSD. Supporta solo tipi di sequenza semplici, complessi e supporta solo le funzionalità XSD di base.

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

La tabella seguente illustra la conversione dei tipi di dati XSD in tipi di dati Spark:

Tipi di dati XSD Tipi di dati Spark
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer, negativeInteger, nonNegativeInteger, nonPositiveInteger, positiveIntegerunsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

Analizzare xml annidato

I dati XML in una colonna con valori di stringa in un dataframe esistente possono essere analizzati con schema_of_xml e from_xml che restituisce lo schema e i risultati analizzati come nuove colonne struct. I dati XML passati come argomento a schema_of_xml e from_xml devono essere un singolo record XML ben formato.

schema_of_xml

Sintassi

schema_of_xml(xmlStr [, options] )

Argomenti

  • xmlStr: espressione STRING che specifica un singolo record XML ben formato.
  • options: valore letterale facoltativo MAP<STRING,STRING> che specifica le direttive.

Resi

Stringa che contiene una definizione di una struttura dati con n campi di stringhe, in cui i nomi delle colonne sono derivati dai nomi degli elementi e degli attributi XML. I valori dei campi contengono i tipi SQL formattati derivati.

from_xml

Sintassi

from_xml(xmlStr, schema [, options])

Argomenti

  • xmlStr: espressione STRING che specifica un singolo record XML ben formato.
  • schema: espressione STRING o chiamata della schema_of_xml funzione.
  • options: valore letterale facoltativo MAP<STRING,STRING> che specifica le direttive.

Resi

Una struttura con nomi di campo e tipi corrispondenti alla definizione dello schema. Lo schema deve essere definito come coppie di nomi di colonna e tipi di dati separate da virgole come utilizzato, ad esempio, in CREATE TABLE. La maggior parte delle opzioni visualizzate nelle opzioni dell'origine dati è applicabile con le eccezioni seguenti:

  • rowTag: poiché è presente un solo record XML, l'opzione rowTag non è applicabile.
  • mode (impostazione predefinita: PERMISSIVE): consente una modalità per gestire i record danneggiati durante l'analisi.
    • PERMISSIVE: quando soddisfa un record danneggiato, inserisce la stringa in formato non valido in un campo configurato da columnNameOfCorruptRecorde imposta i campi in formato non valido su null. Per mantenere i record danneggiati, è possibile impostare un campo di tipo stringa denominato columnNameOfCorruptRecord in uno schema definito dall'utente. Se uno schema non dispone del campo, elimina i record danneggiati durante l'analisi. Quando si deduce uno schema, aggiunge in modo implicito un campo columnNameOfCorruptRecord in uno schema di output.
    • FAILFAST: genera un'eccezione quando soddisfa i record danneggiati.

Conversione della struttura

A causa delle differenze di struttura tra DataFrame e XML, esistono alcune regole di conversione da dati XML a DataFrame e da DataFrame a dati XML. Si noti che la gestione degli attributi può essere disabilitata con l'opzione excludeAttribute.

Conversione da XML a dataframe

Attributi: gli attributi vengono convertiti come campi con il prefisso attributePrefixdell'intestazione .

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

produce uno schema seguente:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

Dati di tipo carattere in un elemento contenente attributi o elementi figlio: vengono analizzati nel valueTag campo. Se sono presenti più occorrenze di dati di tipo carattere, il valueTag campo viene convertito in un array tipo.

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

produce uno schema seguente:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

Conversione da dataframe a XML

Elemento come matrice in una matrice: scrittura di un file XML da DataFrame con un campo ArrayType con il relativo elemento come ArrayType avrebbe un campo annidato aggiuntivo per l'elemento. Ciò non avviene durante la lettura e la scrittura di dati XML, ma la scrittura di una DataFrame lettura da altre origini. Pertanto, il round trip nella lettura e scrittura di file XML ha la stessa struttura, ma la scrittura di una DataFrame lettura da altre origini è possibile avere una struttura diversa.

DataFrame con uno schema seguente:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

e con i dati seguenti:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

produce un file XML seguente:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

Il nome dell'elemento della matrice senza nome in DataFrame è specificato dall'opzione arrayElementName (Default: item).

Colonna di dati salvata

La colonna di dati recuperati garantisce che i dati non vadano persi durante l'ETL. È possibile abilitare la colonna di dati salvata per acquisire tutti i dati che non sono stati analizzati perché uno o più campi in un record presentano uno dei problemi seguenti:

  • Assente dallo schema fornito
  • Non corrisponde al tipo di dati dello schema fornito
  • C'è una mancata corrispondenza di maiuscole e minuscole nei nomi dei campi del schema fornito.

La colonna di dati salvata viene restituita come documento JSON contenente le colonne salvate e il percorso del file di origine del record. Per rimuovere il percorso del file di origine dalla colonna di dati salvata, è possibile impostare la configurazione SQL seguente:

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

È possibile abilitare la colonna di dati recuperata impostando l'opzione rescuedDataColumn su un nome di colonna durante la lettura dei dati, ad esempio _rescued_data con spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>).

Il parser XML supporta tre modalità durante l'analisi dei record: PERMISSIVE, DROPMALFORMEDe FAILFAST. Se usato insieme a rescuedDataColumn, le mancate corrispondenze del tipo di dati non causano l'esclusione dei record in modalità DROPMALFORMED oppure generano un errore in modalità FAILFAST. Solo i record danneggiati (XML incompleto o non valido) vengono eliminati o generati errori.

Inferenza ed evoluzione dello schema nel caricatore automatico

Per una descrizione dettagliata di questo argomento e delle opzioni applicabili, vedere Configurare l'inferenza e l'evoluzione dello schema in Auto Loader. È possibile configurare il caricatore automatico per rilevare automaticamente lo schema dei dati XML caricati, consentendo di inizializzare le tabelle senza dichiarare esplicitamente lo schema dei dati ed evolvere lo schema della tabella man mano che vengono introdotte nuove colonne. In questo modo si elimina la necessità di tenere traccia e applicare manualmente le modifiche dello schema nel tempo.

Per impostazione predefinita, l'inferenza dello schema di Auto Loader cerca di evitare problemi di evoluzione dello schema a causa di mancata corrispondenza dei tipi. Per i formati che non codificano i tipi di dati (JSON, CSV e XML), Il caricatore automatico deduce tutte le colonne come stringhe, inclusi i campi annidati nei file XML. Apache Spark DataFrameReader usa un comportamento differente per l'inferenza dello schema, selezionando i tipi di dati per le colonne delle origini XML in base ai dati di esempio. Per abilitare questo comportamento con il caricatore automatico, impostare l'opzione cloudFiles.inferColumnTypes su true.

Il caricatore automatico rileva l'aggiunta di nuove colonne durante l'elaborazione dei dati. Quando il caricatore automatico rileva una nuova colonna, il flusso si arresta con un UnknownFieldException. Prima che il flusso generi questo errore, Auto Loader esegue l'inferenza dello schema sull'ultimo micro-batch di dati e aggiorna il percorso dello schema con lo schema più recente unendo nuove colonne alla fine dello schema. I tipi di dati delle colonne esistenti rimangono invariati. Il caricatore automatico supporta diverse modalità per gestire l'evoluzione dello schema, che imposti nell'opzione cloudFiles.schemaEvolutionMode.

È possibile usare hint dello schema per applicare le informazioni sullo schema note e previste in uno schema dedotto. Quando si sa che una colonna è di un tipo di dati specifico o se si desidera scegliere un tipo di dati più generale, ad esempio un valore double anziché un numero intero, è possibile fornire un numero arbitrario di hint per i tipi di dati di colonna come stringa usando la sintassi della specifica dello schema SQL. Quando la colonna di dati salvata è abilitata, i campi denominati in un caso diverso da quello dello schema vengono caricati nella colonna _rescued_data. È possibile modificare questo comportamento impostando l'opzione readerCaseSensitive su false, nel qual caso il caricatore automatico legge i dati in modo senza distinzione tra maiuscole e minuscole.

Esempi

Gli esempi in questa sezione usano un file XML disponibile per il download nel repository GitHub di Apache Spark.

Leggere e scrivere codice XML

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

È possibile specificare manualmente lo schema durante la lettura dei dati:

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

API SQL

L'origine dati XML può dedurre i tipi di dati:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

È anche possibile specificare nomi e tipi di colonna in DDL. In questo caso, lo schema non viene dedotto automaticamente.

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

Carica XML utilizzando COPY INTO

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

Leggere il codice XML con la convalida delle righe

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

Analizzare xml annidato (from_xml e schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

from_xml e schema_of_xml con l'API SQL

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

Caricare codice XML con caricatore automatico

Python

query = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", True)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Scala

val query = spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow()
  .toTable("table_name")
  )

Risorse aggiuntive

Leggere e scrivere dati XML usando la libreria spark-xml