XML-gegevens lezen en schrijven met behulp van de spark-xml
bibliotheek
Belangrijk
Deze documentatie is buiten gebruik gesteld en wordt mogelijk niet bijgewerkt. De producten, services of technologieën die in zijn inhoud worden genoemd, worden niet officieel goedgekeurd of getest door Databricks.
Ondersteuning voor systeemeigen XML-bestandsindelingen is beschikbaar als openbare preview. Zie XML-bestanden lezen en schrijven.
In dit artikel wordt beschreven hoe u een XML-bestand kunt lezen en schrijven als een Apache Spark-gegevensbron.
Vereisten
Maak de
spark-xml
bibliotheek als een Maven-bibliotheek. Geef voor de Maven-coördinaat het volgende op:- Databricks Runtime 7.x en hoger:
com.databricks:spark-xml_2.12:<release>
Zie
spark-xml
Releases voor de nieuwste versie van<release>
.- Databricks Runtime 7.x en hoger:
Installeer de bibliotheek op een cluster.
Opmerking
In het voorbeeld in deze sectie wordt het XML-bestand van de boeken gebruikt.
Haal het XML-bestand met boeken op:
$ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
Upload het bestand naar DBFS.
XML-gegevens lezen en schrijven
SQL
/*Infer schema*/
CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
/*Specify column names and types*/
CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
Scala
// Infer schema
import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method
val df = spark.read
.option("rowTag", "book")
.xml("dbfs:/books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
// Specify schema
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read
.option("rowTag", "book")
.schema(customSchema)
.xml("books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
R
# Infer schema
library(SparkR)
sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))
df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")
# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")
# Specify schema
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")
Opties
- Lezen
-
path
: Locatie van XML-bestanden. Accepteert standaard Hadoop globbing-expressies. -
rowTag
: De rijtag die moet worden behandeld als een rij. In deze XML<books><book><book>...</books>
zou de waarde bijvoorbeeld zijnbook
. Standaard isROW
. -
samplingRatio
: Steekproefverhouding voor het afleiden van het schema (0,0 ~ 1). Standaard is 1. Mogelijke typen zijnStructType
,ArrayType
,StringType
,LongType
,DoubleType
,BooleanType
,TimestampType
enNullType
, tenzij u een schema opgeeft. -
excludeAttribute
: Of kenmerken in elementen moeten worden uitgesloten. Standaard ingesteld op onwaar. -
nullValue
: De waarde die moet worden behandeld als eennull
waarde. Standaard is""
. -
mode
: De modus voor het omgaan met beschadigde records. Standaard isPERMISSIVE
.-
PERMISSIVE
:- Wanneer er een beschadigde record optreedt, stelt u alle velden in
null
en plaatst u de ongeldige tekenreeks in een nieuw veld dat is geconfigureerd doorcolumnNameOfCorruptRecord
. - Wanneer er een veld van het verkeerde gegevenstype optreedt, stelt u het offending-veld in op
null
.
- Wanneer er een beschadigde record optreedt, stelt u alle velden in
-
DROPMALFORMED
: negeert beschadigde records. -
FAILFAST
: genereert een uitzondering wanneer beschadigde records worden gedetecteerd.
-
-
inferSchema
: alstrue
, probeert een geschikt type af te leiden voor elke resulterende DataFrame-kolom, zoals een booleaanse, numerieke of datumtype. Alsfalse
, zijn alle resulterende kolommen van stringtype. Standaard istrue
. -
columnNameOfCorruptRecord
: de naam van het nieuwe veld waarin onjuist gevormde tekenreeksen worden opgeslagen. Standaard is_corrupt_record
. -
attributePrefix
: het voorvoegsel voor kenmerken, zodat kenmerken en elementen kunnen worden onderscheiden. Dit is het voorvoegsel voor veldnamen. Standaard is_
. -
valueTag
: De tag die wordt gebruikt voor de waarde wanneer er kenmerken zijn in een element dat geen onderliggende elementen bevat. Standaard is_VALUE
. -
charset
: standaard ingesteld opUTF-8
, maar kan worden ingesteld op andere geldige tekensetnamen. -
ignoreSurroundingSpaces
: Of witruimten rondom waarden al dan niet moeten worden overgeslagen. Standaard ingesteld op onwaar. -
rowValidationXSDPath
: Pad naar een XSD-bestand dat wordt gebruikt om de XML voor elke rij te valideren. Rijen die niet kunnen worden gevalideerd, worden behandeld als parseringsfouten zoals hierboven. De XSD heeft verder geen invloed op het opgegeven of afgeleid schema. Als hetzelfde lokale pad nog niet zichtbaar is op de uitvoerders in het cluster, moeten de XSD en eventuele andere personen waaraan het afhankelijk is, worden toegevoegd aan de Spark-uitvoerders met SparkContext.addFile. In dit geval kunt u lokale XSD/foo/bar.xsd
gebruiken, aanroepenaddFile("/foo/bar.xsd")
en doorgeven"bar.xsd"
alsrowValidationXSDPath
.
-
- Schrijven
-
path
: Locatie voor het schrijven van bestanden. -
rowTag
: De rijtag die moet worden behandeld als een rij. In deze XML<books><book><book>...</books>
zou de waarde bijvoorbeeld zijnbook
. Standaard isROW
. -
rootTag
: de hoofdtag die moet worden behandeld als de hoofdmap. In deze XML<books><book><book>...</books>
zou de waarde bijvoorbeeld zijnbooks
. Standaard isROWS
. -
nullValue
: de waarde die moet worden geschrevennull
. De standaardwaarde is de tekenreeks"null"
. Wanneer"null"
, schrijft het geen kenmerken en elementen voor velden. -
attributePrefix
: het voorvoegsel voor kenmerken om kenmerken en elementen te onderscheiden. Dit is het voorvoegsel voor veldnamen. Standaard is_
. -
valueTag
: De tag die wordt gebruikt voor de waarde wanneer er kenmerken zijn in een element dat geen onderliggende elementen bevat. Standaard is_VALUE
. -
compression
: Compressiecodec die moet worden gebruikt bij het opslaan in een bestand. Moet de volledig gekwalificeerde naam zijn van een klasse die wordt geïmplementeerdorg.apache.hadoop.io.compress.CompressionCodec
of een van niet-hoofdlettergevoelige korte namen (bzip2
,gzip
,lz4
en ).snappy
Standaard is geen compressie.
-
Ondersteunt het verkorte naamgebruik; U kunt xml
in plaats van com.databricks.spark.xml
.
XSD-ondersteuning
U kunt afzonderlijke rijen valideren op basis van een XSD-schema met behulp van rowValidationXSDPath
.
U gebruikt het hulpprogramma com.databricks.spark.xml.util.XSDToSchema
om een Spark DataFrame-schema te extraheren uit enkele XSD-bestanden. Het ondersteunt alleen eenvoudige, complexe en reekstypen, alleen eenvoudige XSD-functionaliteit en is experimenteel.
import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths
val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)
Geneste XML parseren
Hoewel het voornamelijk wordt gebruikt om een XML-bestand te converteren naar een DataFrame, kunt u ook de methode from_xml
gebruiken om XML te parseren in een kolom met tekenreekswaarden in een bestaand DataFrame en deze als een nieuwe kolom met geparseerde resultaten toe te voegen als een struct met:
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
Notitie
-
mode
:- Als deze optie is ingesteld op
PERMISSIVE
, wordt de standaardmodus voor parseren standaard ingesteld opDROPMALFORMED
. Als u een kolom in het schema opneemt voorfrom_xml
die overeenkomt met decolumnNameOfCorruptRecord
, worden inPERMISSIVE
modus onjuiste records naar die kolom uitgevoerd in de resulterende struct. - Als deze optie is ingesteld op
DROPMALFORMED
, resulteren XML-waarden die niet correct worden geparseerd in eennull
waarde voor de kolom. Er worden geen rijen verwijderd.
- Als deze optie is ingesteld op
-
from_xml
converteert matrices van tekenreeksen die XML bevatten naar matrices met geparseerde structs. Gebruik in plaats daarvanschema_of_xml_array
. -
from_xml_string
is een alternatief voor gebruik in UDF's die rechtstreeks op een tekenreeks werkt in plaats van een kolom.
Conversieregels
Vanwege structurele verschillen tussen DataFrames en XML zijn er enkele conversieregels van XML-gegevens naar DataFrame en van DataFrame naar XML-gegevens. U kunt afhandelingskenmerken uitschakelen met de optie excludeAttribute
.
XML converteren naar DataFrame
Kenmerken: Kenmerken worden geconverteerd als velden met het voorvoegsel dat is opgegeven in de
attributePrefix
optie. AlsattributePrefix
dat het is_
, is het document<one myOneAttrib="AAAA"> <two>two</two> <three>three</three> </one>
produceert het schema:
root |-- _myOneAttrib: string (nullable = true) |-- two: string (nullable = true) |-- three: string (nullable = true)
Als een element kenmerken heeft maar geen onderliggende elementen, wordt de kenmerkwaarde in een afzonderlijk veld geplaatst dat is opgegeven in de
valueTag
optie. AlsvalueTag
dat het is_VALUE
, is het document<one> <two myTwoAttrib="BBBBB">two</two> <three>three</three> </one>
produceert het schema:
root |-- two: struct (nullable = true) | |-- _VALUE: string (nullable = true) | |-- _myTwoAttrib: string (nullable = true) |-- three: string (nullable = true)
DataFrame converteren naar XML
Het schrijven van een XML-bestand vanuit een DataFrame met een veld ArrayType
, waarbij het element ArrayType
is, zou een extra genest veld voor dat element hebben. Dit gebeurt niet bij het lezen en schrijven van XML-gegevens, maar bij het schrijven van een DataFrame dat uit andere bronnen is gelezen. Daarom heeft roundtrip in het lezen en schrijven van XML-bestanden dezelfde structuur, maar het schrijven van een DataFrame dat uit andere bronnen wordt gelezen, is mogelijk om een andere structuur te hebben.
Een DataFrame met het schema:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
en gegevens:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
produceert het XML-bestand:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>