Lire et écrire des fichiers XML
Important
Cette fonctionnalité est disponible en préversion publique.
Cet article explique comment lire et écrire des fichiers XML.
XML (Extensible Markup Language) est un langage de balisage pour la mise en forme, le stockage et le partage de données au format textuel. Il définit un ensemble de règles pour la sérialisation des données allant de documents à des structures de données arbitraires.
La prise en charge du format de fichier XML natif permet l’ingestion, l’interrogation et l’analyse des données XML pour le traitement par lots ou la diffusion en continu. Il peut déduire et faire évoluer automatiquement le schéma et les types de données, prend en charge des expressions SQL telles que from_xml
, et peut générer des documents XML. Il ne nécessite pas de fichiers jar externes et fonctionne en toute transparence avec le chargeur automatique, read_files
et COPY INTO
. Vous pouvez éventuellement valider chaque enregistrement XML au niveau des lignes par rapport à une définition de schéma XML (XSD).
Spécifications
Databricks Runtime version 14.3 et supérieures
Analyser des enregistrements XML
La spécification XML impose une structure bien formée. Toutefois, cette spécification ne mappe pas immédiatement à un format tabulaire. Vous devez spécifier l’option rowTag
permettant d’indiquer l’élément XML mappé à un DataFrame
Row
. L’élément rowTag
devient le niveau supérieur struct
. Les éléments enfants de rowTag
deviennent les champs du niveau supérieur struct
.
Vous pouvez spécifier le schéma de cet enregistrement ou le laisser être déduit automatiquement. Étant donné que l’analyseur examine uniquement les éléments rowTag
, les entités DTD et externes sont filtrées.
Les exemples suivants illustrent l’inférence de schéma et l’analyse d’un fichier XML à l’aide de différentes options rowTag
:
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
Lisez le fichier XML avec l’option rowTag
comme « livres » :
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
Sortie :
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
Lisez le fichier XML avec rowTag
comme « livre » :
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
Sortie :
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
Options de source de données
Les options de source de données pour XML peuvent être spécifiées de la manière suivante :
- Méthodes
.option/.options
des éléments suivants :- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- Les fonctions intégrées suivantes :
- La clause
OPTIONS
de CREATE TABLE USING DATA_SOURCE
Pour obtenir la liste des options, consultez Options du chargeur automatique.
Prise en charge XSD
Vous pouvez éventuellement valider chaque enregistrement XML au niveau des lignes par une définition de schéma XML (XSD). Le fichier XSD est spécifié dans l’option rowValidationXSDPath
. Le XSD n’affecte pas le schéma fourni ou inféré. Un enregistrement qui échoue la validation est marqué comme « endommagé » et traité en fonction de l’option de mode de gestion des enregistrements endommagés décrite dans la section d’option.
Vous pouvez utiliser XSDToSchema
pour extraire un schéma DataFrame Spark à partir d’un fichier XSD. Il prend uniquement en charge les types simples, complexes et séquences, et prend uniquement en charge les fonctionnalités XSD de base.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
Le tableau suivant montre la conversion de types de données XSD en types de données Spark :
Type de données XSD | Types de données Spark |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short , unsignedByte |
ShortType |
integer , negativeInteger , nonNegativeInteger , nonPositiveInteger , positiveInteger , unsignedShort |
IntegerType |
long , unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
Analyser les données XML imbriquées
Les données XML d’une colonne à valeur de chaîne dans un DataFrame existant peuvent être analysées avec schema_of_xml
et from_xml
qui retournent le schéma et les résultats analysés sous forme de nouvelles colonnes struct
. Les données XML transmises en tant qu’argument à schema_of_xml
et from_xml
doivent être un enregistrement XML bien formé.
schema_of_xml
Syntaxe
schema_of_xml(xmlStr [, options] )
Arguments
xmlStr
: expression STRING spécifiant un enregistrement XML bien formé unique.options
: unMAP<STRING,STRING>
littéral optionnel spécifiant les directives.
Renvoie
Un STRING contenant une définition d’un struct avec n champs de chaînes où les noms des colonnes sont dérivés de l’élément XML et des noms d’attributs. Les valeurs de champ contiennent les types SQL mis en forme dérivés.
from_xml
Syntaxe
from_xml(xmlStr, schema [, options])
Arguments
xmlStr
: expression STRING spécifiant un enregistrement XML bien formé unique.schema
: expression STRING ou appel de la fonctionschema_of_xml
.options
: unMAP<STRING,STRING>
littéral optionnel spécifiant les directives.
Renvoie
Struct dont le nom et le type des champs correspondent à la définition de schéma. Le schéma doit être défini en tant que paires nom de colonne-type de données séparées par des virgules, comme dans CREATE TABLE
. La plupart des options affichées dans les options de source de données s’appliquent, à l’exception des éléments suivants :
rowTag
: étant donné qu’il n’existe qu’un seul enregistrement XML, l’optionrowTag
n’est pas applicable.mode
(PERMISSIVE
par défaut) : autorise un mode de traitement des enregistrements endommagés pendant l’analyse.PERMISSIVE
: en présence d’un enregistrement endommagé, place la chaîne malformée dans un champ configuré parcolumnNameOfCorruptRecord
et définit les champs malformés surnull
. Pour conserver les enregistrements corrompus, vous pouvez définir un champ de type chaîne nommécolumnNameOfCorruptRecord
dans un schéma défini par l’utilisateur. Si le schéma est dépourvu de ce champ, les enregistrements endommagés sont supprimés au cours de l’analyse. Lors de l’inférence d’un schéma, un champcolumnNameOfCorruptRecord
est ajouté implicitement dans un schéma de sortie.FAILFAST
: lève une exception en présence d’enregistrements endommagés.
Conversion de structure
En raison des différences de structure entre DataFrame et XML, il existe certaines règles de conversion de données XML vers DataFrame
et de DataFrame
vers les données XML. Notez que la gestion des attributs peut être désactivée avec l’option excludeAttribute
.
Conversion de XML en DataFrame
Attributs : les attributs sont convertis en tant que champs avec le préfixe de titre attributePrefix
.
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
produit un schéma ci-dessous :
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
Données de caractères dans un élément contenant des attributs ou des éléments enfants : Celles-ci sont analysées dans le champ valueTag
. S’il existe plusieurs occurrences de données caractères, le champ valueTag
est converti en type array
.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
produit un schéma ci-dessous :
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
Conversion de DataFrame en XML
Élément comme tableau dans un tableau : l’écriture d’un fichier XML à partir de DataFrame
avec un champ ArrayType
et son élément comme ArrayType
aurait un champ imbriqué supplémentaire pour l’élément. Cela ne se produirait pas lors de la lecture et de l’écriture de données XML, mais l’écriture d’un DataFrame
lu à partir d’autres sources. Par conséquent, l’aller-retour dans la lecture et l’écriture des fichiers XML a la même structure, mais l’écriture d’un DataFrame
lu à partir d’autres sources peut avoir une structure différente.
DataFrame avec un schéma ci-dessous :
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
et avec les données ci-dessous :
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
produit un fichier XML ci-dessous :
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
Le nom de l’élément du tableau sans nom dans le DataFrame
est spécifié par l’option arrayElementName
(par défaut : item
).
Colonne de données récupérées
La colonne des données récupérées garantit que vous ne perdez pas ou ne manquez pas les données pendant l’opération ETL. Vous pouvez activer la colonne de données récupérées pour capturer les données qui n’ont pas été analysées, car un ou plusieurs champs d’un enregistrement présentent l’un des problèmes suivants :
- Absent du schéma fourni
- Ne correspond pas au type de données du schéma fourni
- A une incompatibilité de casse avec les noms de champs dans le schéma fourni
La colonne des données récupérées est renvoyée sous la forme d’un document JSON contenant les colonnes récupérées, ainsi que le chemin d’accès au fichier source de l’enregistrement. Pour supprimer le chemin du fichier source de la colonne de données récupérées, vous pouvez définir la configuration SQL suivante :
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
Vous pouvez activer la colonne de données sauvées en définissant l’option rescuedDataColumn
sur un nom de colonne, comme _rescued_data
avec spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
.
L’analyseur XML prend en charge trois modes d’analyse des enregistrements : PERMISSIVE
, DROPMALFORMED
et FAILFAST
. En cas d’utilisation avec rescuedDataColumn
, les discordances de type de données n’entraînent pas de suppression d’enregistrements en mode DROPMALFORMED
, ou de génération d’erreur en mode FAILFAST
. Seuls les enregistrements endommagés (enregistrements XML malformés ou incomplets) sont annulés ou génèrent des erreurs.
Inférence et évolution de schéma dans le chargeur automatique
Pour une présentation détaillée de cette rubrique et des options applicables, consultez Configurer l’inférence de schéma et l’évolution dans le chargeur automatique. Vous pouvez configurer le chargeur automatique pour détecter automatiquement le schéma des données XML chargées, ce qui vous permet d’initialiser des tables sans déclarer explicitement le schéma de données et de faire évoluer le schéma de table à mesure que de nouvelles colonnes sont introduites. Cela élimine la nécessité de suivre et d’appliquer manuellement les modifications de schéma au fil du temps.
Par défaut, l’inférence de schéma Auto Loader cherche à éviter les problèmes d’évolution du schéma en raison d’incompatibilités de type. Pour les formats qui n’encodent pas les types de données (JSON, CSV et XML), le chargeur automatique déduit toutes les colonnes sous forme de chaînes, y compris les champs imbriqués dans les fichiers XML. Apache Spark DataFrameReader
utilise un comportement différent pour l’inférence de schéma, en sélectionnant des types de données pour les colonnes dans des sources XML en fonction d’exemples de données. Pour activer ce comportement avec Auto Loader, définissez l’option cloudFiles.inferColumnTypes
sur true
.
Auto Loader détecte l’ajout de nouvelles colonnes lors du traitement de vos données. Lorsque le chargeur automatique détecte une nouvelle colonne, le flux s’arrête avec un UnknownFieldException
. Avant que votre stream ne génère cette erreur, Auto Loader effectue l’inférence de schéma sur le dernier micro-batch de données et met à jour l’emplacement du schéma avec le schéma le plus récent en fusionnant les nouvelles colonnes à la fin du schéma. Les types de données des colonnes existantes restent inchangés. Le chargeur automatique prend en charge différents modes pour l’évolution des schémas, que vous définissez dans l’option cloudFiles.schemaEvolutionMode
.
Vous pouvez utiliser des indicateurs de schéma pour appliquer les informations de schéma que vous connaissez et attendez sur un schéma déduit. Lorsque vous savez qu’une colonne est d’un type de données spécifique, ou si vous voulez choisir un type de données plus général (par exemple, un double au lieu d’un entier), vous pouvez fournir un nombre arbitraire d’indices pour les types de données de colonne sous forme de chaîne en utilisant la syntaxe de spécification de schéma SQL. Lorsque la colonne de données récupérées est activée, les champs nommés dans une casse autre que celle du schéma sont chargés dans la colonne _rescued_data
. Vous pouvez modifier ce comportement en définissant l’option readerCaseSensitive
sur false
, auquel cas le chargeur automatique lit les données sans respect de la casse.
Exemples
Les exemples de cette section utilisent un fichier XML disponible en téléchargement dans le référentiel GitHub Apache Spark.
Lire et écrire XML
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
Vous pouvez spécifier manuellement le schéma lors de la lecture des données :
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
API SQL
La source de données XML peut déduire les types de données :
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
Vous pouvez également spécifier des noms de colonnes et des types dans DDL. Dans ce cas, le schéma n’est pas déduit automatiquement.
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
Charger du code XML à l’aide de COPY INTO
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
Lire du code XML avec validation de ligne
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
Analyser le code XML imbriqué (from_xml et schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
from_xml et schema_of_xml avec l’API SQL
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
Charger du code XML avec le chargeur automatique
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)
Ressources supplémentaires
Lire et écrire des données XML à l’aide de la bibliothèque spark-xml