XML-bestanden lezen en schrijven
Belangrijk
Deze functie is beschikbaar als openbare preview.
In dit artikel wordt beschreven hoe u XML-bestanden leest en schrijft.
Extensible Markup Language (XML) is een opmaaktaal voor het opmaken, opslaan en delen van gegevens in tekstindeling. Hiermee definieert u een set regels voor het serialiseren van gegevens, variërend van documenten tot willekeurige gegevensstructuren.
Systeemeigen XML-bestandsindeling biedt ondersteuning voor opname, query's en parseren van XML-gegevens voor batchverwerking of streaming. Het kan automatisch schema- en gegevenstypen afleiden en ontwikkelen, SQL-expressies zoals from_xml
ondersteunt en XML-documenten kan genereren. Het vereist geen externe jar's en werkt naadloos met Auto Loader, read_files
en COPY INTO
. U kunt desgewenst elke XML-record op rijniveau valideren op een XML-schemadefinitie (XSD).
Vereisten
Databricks Runtime 14.3 en hoger
XML-records parseren
XML-specificatie vereist een goed gevormde structuur. Deze specificatie wordt echter niet onmiddellijk toegewezen aan een tabellaire indeling. U moet de rowTag
optie opgeven om het XML-element aan te geven dat wordt toegewezen aan een DataFrame
Row
. Het rowTag
element wordt het hoogste niveau struct
. De onderliggende elementen worden rowTag
de velden van het hoogste niveau struct
.
U kunt het schema voor deze record opgeven of deze automatisch laten afleiden. Omdat de parser alleen de rowTag
elementen onderzoekt, worden DTD en externe entiteiten uitgefilterd.
In de volgende voorbeelden ziet u schemadeductie en parsering van een XML-bestand met behulp van verschillende rowTag
opties:
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
Lees het XML-bestand met rowTag
de optie 'boeken':
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
Uitvoer:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
Lees het XML-bestand met rowTag
als 'boek':
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
Uitvoer:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
Opties voor gegevensbronnen
Opties voor gegevensbronnen voor XML kunnen op de volgende manieren worden opgegeven:
- De
.option/.options
methoden van het volgende:- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- De volgende ingebouwde functies:
- De
OPTIONS
clausule van CREATE TABLE USING DATA_SOURCE
Zie Opties voor automatisch ladenvoor een lijst met opties.
XSD-ondersteuning
U kunt desgewenst elke XML-record op rijniveau valideren door een XML-schemadefinitie (XSD). Het XSD-bestand wordt opgegeven in de rowValidationXSDPath
optie. De XSD heeft verder geen invloed op het opgegeven of afgeleid schema. Een record die mislukt door de validatie, wordt gemarkeerd als beschadigd en verwerkt op basis van de optie voor het verwerken van beschadigde records die in de optiesectie wordt beschreven.
U kunt XSDToSchema
gebruiken om een Spark DataFrame-schema uit een XSD-bestand te extraheren. Het ondersteunt alleen eenvoudige, complexe en reekstypen en biedt alleen ondersteuning voor eenvoudige XSD-functionaliteit.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
In de volgende tabel ziet u de conversie van XSD-gegevenstypen naar Spark-gegevenstypen:
XSD-gegevenstypen | Spark-gegevenstypen |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short , unsignedByte |
ShortType |
integer , , negativeInteger nonNegativeInteger , nonPositiveInteger , , , positiveInteger unsignedShort |
IntegerType |
long , unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
Geneste XML parseren
XML-gegevens in een kolom met tekenreekswaarden in een bestaand DataFrame kunnen worden geparseerd met schema_of_xml
en from_xml
die het schema en de geparseerde resultaten als nieuwe struct
kolommen retourneren. XML-gegevens die als argument schema_of_xml
worden doorgegeven en from_xml
moeten één goed opgemaakt XML-record zijn.
schema_of_xml
Syntaxis
schema_of_xml(xmlStr [, options] )
Argumenten
-
xmlStr
: Een TEKENREEKS-expressie die één goed opgemaakt XML-record aangeeft. -
options
: Een optionele letterlijkeMAP<STRING,STRING>
waarde die instructies aangeeft.
Retouren
Een TEKENREEKS met een definitie van een struct met n velden met tekenreeksen waarvan de kolomnamen zijn afgeleid van het XML-element en de kenmerknamen. De veldwaarden bevatten de afgeleide, opgemaakte SQL-typen.
from_xml
Syntaxis
from_xml(xmlStr, schema [, options])
Argumenten
-
xmlStr
: Een TEKENREEKS-expressie die één goed opgemaakt XML-record aangeeft. -
schema
: Een TEKENREEKS-expressie of aanroep van deschema_of_xml
functie. -
options
: Een optionele letterlijkeMAP<STRING,STRING>
waarde die instructies aangeeft.
Retouren
Een struct met veldnamen en -typen die overeenkomen met de schemadefinitie. Het schema moet worden gedefinieerd als door komma's gescheiden kolomnaam en gegevenstypeparen zoals wordt gebruikt in bijvoorbeeld CREATE TABLE
. De meeste opties die worden weergegeven in de opties voor gegevensbronnen zijn van toepassing met de volgende uitzonderingen:
-
rowTag
: Omdat er slechts één XML-record is, is derowTag
optie niet van toepassing. -
mode
(standaard:PERMISSIVE
): Hiermee kan een modus worden gebruikt voor het verwerken van beschadigde records tijdens het parseren.-
PERMISSIVE
: Wanneer deze voldoet aan een beschadigde record, plaatst u de ongeldige tekenreeks in een veld dat is geconfigureerd doorcolumnNameOfCorruptRecord
en stelt u onjuiste velden in opnull
. Als u beschadigde records wilt behouden, kunt u een tekenreekstypeveld met de naamcolumnNameOfCorruptRecord
instellen in een door de gebruiker gedefinieerd schema. Als een schema het veld niet heeft, worden beschadigde records verwijderd tijdens het parseren. Bij het afleiden van een schema wordt er impliciet eencolumnNameOfCorruptRecord
-veld aan een uitvoerschema toegevoegd. -
FAILFAST
: Genereert een uitzondering wanneer deze voldoet aan beschadigde records.
-
Structuurconversie
Vanwege de structuurverschillen tussen DataFrame en XML zijn er enkele conversieregels van XML-gegevens naar DataFrame
en van DataFrame
naar XML-gegevens. Houd er rekening mee dat afhandelingskenmerken kunnen worden uitgeschakeld met de optie excludeAttribute
.
Conversie van XML naar DataFrame
Kenmerken: Kenmerken worden geconverteerd als velden met het kopvoorvoegsel attributePrefix
.
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
produceert hieronder een schema:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
Tekengegevens in een element met kenmerken of onderliggende elementen: deze worden geparseerd in het valueTag
veld. Als er meerdere exemplaren van tekengegevens zijn, wordt het valueTag
veld geconverteerd naar een array
type.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
produceert hieronder een schema:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
Conversie van DataFrame naar XML
element als een array in een array: Bij het schrijven van een XML-bestand van DataFrame
dat een veld ArrayType
heeft met zijn element als ArrayType
, zou er een extra genest veld voor het element zijn. Dit gebeurt niet bij het lezen en schrijven van XML-gegevens, maar het schrijven van een DataFrame
leesbewerking uit andere bronnen. Daarom heeft roundtrip in het lezen en schrijven van XML-bestanden dezelfde structuur, maar het schrijven van een DataFrame
leesbewerking uit andere bronnen is mogelijk om een andere structuur te hebben.
DataFrame met een schema hieronder:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
en met de onderstaande gegevens:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
produceert hieronder een XML-bestand:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
De elementnaam van de niet-benoemde matrix in de DataFrame
matrix wordt opgegeven door de optie arrayElementName
(standaard: item
).
Kolom met geredde gegevens
De kolom met geredde gegevens zorgt ervoor dat u nooit gegevens kwijtraakt of mist tijdens ETL. U kunt de opgeslagen gegevenskolom inschakelen om gegevens vast te leggen die niet zijn geparseerd omdat een of meer velden in een record een van de volgende problemen hebben:
- Afwezig uit het opgegeven schema
- Komt niet overeen met het gegevenstype van het opgegeven schema
- Komt niet overeen met de veldnamen in het opgegeven schema
De kolom met geredde gegevens wordt geretourneerd als een JSON-document met de kolommen die zijn gered en het bronbestandspad van de record. Als u het bronbestandspad uit de kolom met geredde gegevens wilt verwijderen, kunt u de volgende SQL-configuratie instellen:
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
U kunt de kolom met geredde gegevens inschakelen door de optie rescuedDataColumn
in te stellen op een kolomnaam bij het lezen van gegevens, zoals _rescued_data
met spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
.
De XML-parser ondersteunt drie modi bij het parseren van records: PERMISSIVE
, DROPMALFORMED
en FAILFAST
. Wanneer gegevenstypen samen met rescuedDataColumn
elkaar worden gebruikt, komen records niet overeen in DROPMALFORMED
de modus of veroorzaken ze een fout in FAILFAST
de modus. Alleen beschadigde records (onvolledige of onjuiste XML) worden verwijderd of fouten veroorzaakt.
Automatische schema-afleiding en evolutie in Auto Loader
Zie Schemadeductie en evolutie configureren in AutoLoadervoor een gedetailleerde bespreking van dit onderwerp en de toepasselijke opties. U kunt automatisch laden configureren om het schema van geladen XML-gegevens automatisch te detecteren, zodat u tabellen kunt initialiseren zonder expliciet het gegevensschema te declareren en het tabelschema te ontwikkelen naarmate er nieuwe kolommen worden geïntroduceerd. Hierdoor hoeft u geen schemawijzigingen handmatig bij te houden en toe te passen in de loop van de tijd.
Bij standaardinstellingen probeert Auto Loader-schema-inferentie problemen met schema-evolutie te vermijden vanwege typemismatches. Voor bestandsindelingen die geen gegevenstypen coderen (JSON, CSV en XML), worden alle kolommen als tekenreeksen geïnterpreteerd, inclusief geneste velden in XML-bestanden. De Apache Spark-DataFrameReader
gebruikt een ander gedrag voor schemadeductie, waarbij gegevenstypen worden geselecteerd voor kolommen in XML-bronnen op basis van voorbeeldgegevens. Als u dit gedrag wilt inschakelen met automatisch laden, stelt u de optie cloudFiles.inferColumnTypes
in op true
.
Auto Loader detecteert de toevoeging van nieuwe kolommen terwijl deze uw gegevens verwerkt. Wanneer Automatisch Laden een nieuwe kolom detecteert, wordt de stroom gestopt met een UnknownFieldException
. Voordat uw stream deze fout genereert, voert Auto Loader schemadeductie uit op de meest recente microbatch met gegevens en werkt de schemalocatie bij met het nieuwste schema door nieuwe kolommen samen te voegen aan het einde van het schema. De gegevenstypen van bestaande kolommen blijven ongewijzigd. Auto Loader ondersteunt verschillende modi voor schemaontwikkeling, die u instelt in de optie cloudFiles.schemaEvolutionMode
.
U kunt schemahints gebruiken om de schemagegevens af te dwingen die u kent en verwacht in een afgeleid schema. Wanneer u weet dat een kolom van een specifiek gegevenstype is of als u een meer algemeen gegevenstype wilt kiezen (bijvoorbeeld een dubbele waarde in plaats van een geheel getal), kunt u een willekeurig aantal hints voor kolomgegevenstypen opgeven als een tekenreeks met de syntaxis van de SQL-schemaspecificatie. Wanneer de kolom met geredde gegevens is ingeschakeld, worden velden met een andere naam dan die van het schema geladen in de _rescued_data
kolom. U kunt dit gedrag wijzigen door de optie readerCaseSensitive
in te false
stellen op, in welk geval Automatisch laadprogramma gegevens op een niet-hoofdlettergevoelige manier leest.
Voorbeelden
In de voorbeelden in deze sectie wordt een XML-bestand gebruikt dat kan worden gedownload in de Apache Spark GitHub-opslagplaats.
XML lezen en schrijven
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
U kunt het schema handmatig opgeven bij het lezen van gegevens:
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
SQL-API
XML-gegevensbron kan gegevenstypen afleiden:
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
U kunt ook kolomnamen en -typen opgeven in DDL. In dit geval wordt het schema niet automatisch afgeleid.
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
XML laden met COPY INTO
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
XML lezen met rijvalidatie
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
Geneste XML parseren (from_xml en schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
from_xml en schema_of_xml met SQL API
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
XML laden met automatisch laadprogramma
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)
Aanvullende bronnen
XML-gegevens lezen en schrijven met behulp van de spark-XML-bibliotheek