Läsa och skriva XML-filer
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
I den här artikeln beskrivs hur du läser och skriver XML-filer.
Utökningsbart Markup Language (XML) är ett markeringsspråk för formatering, lagring och delning av data i textformat. Den definierar en uppsättning regler för serialisering av data från dokument till godtyckliga datastrukturer.
Stöd för internt XML-filformat möjliggör inmatning, frågekörning och parsning av XML-data för batchbearbetning eller strömning. Den kan automatiskt härleda och utveckla schema- och datatyper, stöder SQL-uttryck som from_xml
och kan generera XML-dokument. Det kräver inte externa burkar och fungerar sömlöst med Auto Loader och read_files
COPY INTO
. Du kan också verifiera varje XML-post på radnivå mot en XML-schemadefinition (XSD).
Krav
Databricks Runtime 14.3 och senare
Parsa XML-poster
XML-specifikationen kräver en välformulerad struktur. Den här specifikationen mappas dock inte direkt till tabellformat. Du måste ange alternativet rowTag
för att ange XML-elementet som mappar till en DataFrame
Row
. Elementet rowTag
blir den översta nivån struct
. De underordnade elementen rowTag
i blir fälten på den översta nivån struct
.
Du kan ange schemat för den här posten eller låta den härledas automatiskt. Eftersom parsern endast undersöker elementen rowTag
filtreras DTD och externa entiteter bort.
Följande exempel illustrerar schemainferens och parsning av en XML-fil med olika rowTag
alternativ:
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
Läs XML-filen med rowTag
alternativet "books":
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
Utdata:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
Läs XML-filen med rowTag
som "bok":
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
Utdata:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
Alternativ för datakälla
Datakällans alternativ för XML kan anges på följande sätt:
- Metoderna
.option/.options
för följande:- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- Följande inbyggda funktioner:
OPTIONS
Satsen i CREATE TABLE USING DATA_SOURCE
En lista över alternativ finns i Alternativ för automatisk inläsning.
XSD-stöd
Du kan också verifiera varje XML-post på radnivå med en XML-schemadefinition (XSD). XSD-filen anges i alternativet rowValidationXSDPath
. XSD påverkar inte det angivna eller härledda schemat. En post som misslyckas med valideringen markeras som "skadad" och hanteras baserat på alternativet för hantering av skadade poster som beskrivs i alternativavsnittet.
Du kan använda XSDToSchema
för att extrahera ett Spark DataFrame-schema från en XSD-fil. Den stöder endast enkla, komplexa och sekvenstyper och stöder endast grundläggande XSD-funktioner.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
I följande tabell visas konverteringen av XSD-datatyper till Spark-datatyper:
XSD-datatyper | Spark-datatyper |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short , unsignedByte |
ShortType |
integer , negativeInteger , nonNegativeInteger , nonPositiveInteger , , , positiveInteger unsignedShort |
IntegerType |
long , unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
Parsa kapslad XML
XML-data i en strängvärdeskolumn i en befintlig DataFrame kan parsas med schema_of_xml
och from_xml
som returnerar schemat och de tolkade resultaten som nya struct
kolumner. XML-data som skickas som ett argument till schema_of_xml
och from_xml
måste vara en enda välformulerad XML-post.
schema_of_xml
Syntax
schema_of_xml(xmlStr [, options] )
Argument
xmlStr
: Ett STRING-uttryck som anger en enda välformulerad XML-post.options
: En valfriMAP<STRING,STRING>
literal som anger direktiv.
Returer
En STRÄNG som innehåller en definition av en struct med n fält med strängar där kolumnnamnen härleds från XML-elementet och attributnamnen. Fältvärdena innehåller de härledda formaterade SQL-typerna.
from_xml
Syntax
from_xml(xmlStr, schema [, options])
Argument
xmlStr
: Ett STRING-uttryck som anger en enda välformulerad XML-post.schema
: Ett STRING-uttryck eller anrop avschema_of_xml
funktionen.options
: En valfriMAP<STRING,STRING>
literal som anger direktiv.
Returer
En struct med fältnamn och typer som matchar schemadefinitionen. Schemat måste definieras som kommaavgränsat kolumnnamn och datatyppar som används i till exempel CREATE TABLE
. De flesta alternativ som visas i alternativen för datakällan gäller med följande undantag:
rowTag
: Eftersom det bara finns en XML-post är alternativetrowTag
inte tillämpligt.mode
(standard:PERMISSIVE
): Tillåter ett läge för att hantera skadade poster under parsning.PERMISSIVE
: När den möter en skadad post placerar du den felaktiga strängen i ett fält som konfigurerats avcolumnNameOfCorruptRecord
och anger felaktiga fält tillnull
. Om du vill behålla skadade poster kan du ange ett strängtypfält med namnetcolumnNameOfCorruptRecord
i ett användardefinierat schema. Om ett schema inte har fältet släpps skadade poster under parsningen. När du härleder ett schema lägger det implicit till ettcolumnNameOfCorruptRecord
fält i ett utdataschema.FAILFAST
: Utlöser ett undantag när det möter skadade poster.
Strukturkonvertering
På grund av strukturskillnaderna mellan DataFrame och XML finns det vissa konverteringsregler från XML-data till DataFrame
och från DataFrame
till XML-data. Observera att hanteringsattribut kan inaktiveras med alternativet excludeAttribute
.
Konvertering från XML till DataFrame
Attribut: Attribut konverteras som fält med rubrikprefixet attributePrefix
.
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
skapar ett schema nedan:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
Teckendata i ett element som innehåller attribut eller underordnade element: Dessa parsas i fältet valueTag
. Om det finns flera förekomster av teckendata konverteras fältet valueTag
till en array
typ.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
skapar ett schema nedan:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
Konvertering från DataFrame till XML
Element som en matris i en matris: Skriva en XML-fil från DataFrame
att ha ett fält ArrayType
med dess element, liksom ArrayType
ytterligare ett kapslat fält för elementet. Detta skulle inte inträffa vid läsning och skrivning av XML-data, utan när du skriver en DataFrame
läsning från andra källor. Därför har tur och retur i läsning och skrivning av XML-filer samma struktur, men att skriva en DataFrame
läsning från andra källor är möjligt att ha en annan struktur.
DataFrame med ett schema nedan:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
och med data nedan:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
skapar en XML-fil nedan:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
Elementnamnet för den namnlösa matrisen DataFrame
i anges av alternativet arrayElementName
(standard: item
).
Datakolumnen Räddad
Den räddade datakolumnen säkerställer att du aldrig förlorar eller går miste om data under ETL. Du kan aktivera den räddade datakolumnen för att samla in data som inte parsats eftersom ett eller flera fält i en post har något av följande problem:
- Frånvarande från det angivna schemat
- Matchar inte datatypen för det angivna schemat
- Har ett ärendematchningsfel med fältnamnen i det angivna schemat
Den räddade datakolumnen returneras som ett JSON-dokument som innehåller kolumnerna som räddades och källfilens sökväg till posten. Om du vill ta bort källfilsökvägen från den räddade datakolumnen kan du ange följande SQL-konfiguration:
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
Du kan aktivera den räddade datakolumnen genom att ange alternativet rescuedDataColumn
till ett kolumnnamn när du läser data, till exempel _rescued_data
med spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
.
XML-parsern stöder tre lägen vid parsning av poster: PERMISSIVE
, DROPMALFORMED
och FAILFAST
. När de används tillsammans med rescuedDataColumn
, orsakar inte datatypsmatchningar att poster tas bort i DROPMALFORMED
läge eller utlöser ett fel i FAILFAST
läge. Endast skadade poster (ofullständig eller felaktig XML) tas bort eller utlöser fel.
Schemainferens och utveckling i Auto Loader
En detaljerad beskrivning av det här avsnittet och tillämpliga alternativ finns i Konfigurera schemainferens och utveckling i Automatisk inläsning. Du kan konfigurera automatisk inläsning för att automatiskt identifiera schemat för inlästa XML-data, så att du kan initiera tabeller utan att uttryckligen deklarera dataschemat och utveckla tabellschemat när nya kolumner introduceras. Detta eliminerar behovet av att manuellt spåra och tillämpa schemaändringar över tid.
Som standard försöker schemainferensen för automatisk inläsning undvika problem med schemautveckling på grund av typmatchningar. För format som inte kodar datatyper (JSON, CSV och XML) härleder Auto Loader alla kolumner som strängar, inklusive kapslade fält i XML-filer. Apache Spark DataFrameReader
använder ett annat beteende för schemainferens och väljer datatyper för kolumner i XML-källor baserat på exempeldata. Om du vill aktivera det här beteendet med Auto Loader anger du alternativet cloudFiles.inferColumnTypes
till true
.
Automatisk inläsning identifierar tillägg av nya kolumner när dina data bearbetas. När Auto Loader identifierar en ny kolumn stoppas strömmen med en UnknownFieldException
. Innan strömmen genererar det här felet utför Auto Loader schemainferens på den senaste mikrobatchen med data och uppdaterar schemaplatsen med det senaste schemat genom att slå samman nya kolumner till slutet av schemat. Datatyperna för befintliga kolumner förblir oförändrade. Auto Loader stöder olika lägen för schemautveckling, som du anger i alternativet cloudFiles.schemaEvolutionMode
.
Du kan använda schematips för att framtvinga schemainformationen som du känner till och förväntar dig på ett härledt schema. När du vet att en kolumn är av en specifik datatyp, eller om du vill välja en mer allmän datatyp (till exempel en dubbel i stället för ett heltal), kan du ange ett godtyckligt antal tips för kolumndatatyper som en sträng med sql-schemaspecifikationssyntax. När den räddade datakolumnen är aktiverad läses fält med namnet i ett annat fall än schemat in till _rescued_data
kolumnen. Du kan ändra det här beteendet genom att ange alternativet readerCaseSensitive
till false
, i vilket fall Auto Loader läser data på ett skiftlägesokänsligt sätt.
Exempel
Exemplen i det här avsnittet använder en XML-fil som är tillgänglig för nedladdning på Apache Spark GitHub-lagringsplatsen.
Läsa och skriva XML
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
Du kan ange schemat manuellt när du läser data:
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
API för SQL
XML-datakällan kan härleda datatyper:
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
Du kan också ange kolumnnamn och typer i DDL. I det här fallet härleds inte schemat automatiskt.
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
Läs in XML med COPY INTO
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
Läsa XML med radverifiering
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
Parsa kapslad XML (from_xml och schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
from_xml och schema_of_xml med SQL API
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
Läsa in XML med automatisk inläsning
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)