Delen via


XML-gegevens lezen en schrijven met behulp van de spark-xml bibliotheek

Belangrijk

Deze documentatie is buiten gebruik gesteld en wordt mogelijk niet bijgewerkt. De producten, services of technologieën die in zijn inhoud worden genoemd, worden niet officieel goedgekeurd of getest door Databricks.

Ondersteuning voor systeemeigen XML-bestandsindelingen is beschikbaar als openbare preview. Zie XML-bestanden lezen en schrijven.

In dit artikel wordt beschreven hoe u een XML-bestand kunt lezen en schrijven als een Apache Spark-gegevensbron.

Vereisten

  1. Maak de spark-xml bibliotheek als een Maven-bibliotheek. Geef voor de Maven-coördinaat het volgende op:

    • Databricks Runtime 7.x en hoger: com.databricks:spark-xml_2.12:<release>

    Zie spark-xmlReleases voor de nieuwste versie van <release>.

  2. Installeer de bibliotheek op een cluster.

Opmerking

In het voorbeeld in deze sectie wordt het XML-bestand van de boeken gebruikt.

  1. Haal het XML-bestand met boeken op:

    $ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
    
  2. Upload het bestand naar DBFS.

XML-gegevens lezen en schrijven

SQL

/*Infer schema*/

CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

/*Specify column names and types*/

CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

Scala

// Infer schema

import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method

val df = spark.read
  .option("rowTag", "book")
  .xml("dbfs:/books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

// Specify schema

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))

val df = spark.read
  .option("rowTag", "book")
  .schema(customSchema)
  .xml("books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

R

# Infer schema

library(SparkR)

sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))

df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")

# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")

# Specify schema

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")

# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")

Opties

  • Lezen
    • path: Locatie van XML-bestanden. Accepteert standaard Hadoop globbing-expressies.
    • rowTag: De rijtag die moet worden behandeld als een rij. In deze XML <books><book><book>...</books>zou de waarde bijvoorbeeld zijn book. Standaard is ROW.
    • samplingRatio: Steekproefverhouding voor het afleiden van het schema (0,0 ~ 1). Standaard is 1. Mogelijke typen zijn StructType, ArrayType, StringType, LongType, DoubleType, BooleanType, TimestampType en NullType, tenzij u een schema opgeeft.
    • excludeAttribute: Of kenmerken in elementen moeten worden uitgesloten. Standaard ingesteld op onwaar.
    • nullValue: De waarde die moet worden behandeld als een null waarde. Standaard is "".
    • mode: De modus voor het omgaan met beschadigde records. Standaard is PERMISSIVE.
      • PERMISSIVE:
        • Wanneer er een beschadigde record optreedt, stelt u alle velden in null en plaatst u de ongeldige tekenreeks in een nieuw veld dat is geconfigureerd door columnNameOfCorruptRecord.
        • Wanneer er een veld van het verkeerde gegevenstype optreedt, stelt u het offending-veld in op null.
      • DROPMALFORMED: negeert beschadigde records.
      • FAILFAST: genereert een uitzondering wanneer beschadigde records worden gedetecteerd.
    • inferSchema: als true, probeert een geschikt type af te leiden voor elke resulterende DataFrame-kolom, zoals een booleaanse, numerieke of datumtype. Als false, zijn alle resulterende kolommen van stringtype. Standaard is true.
    • columnNameOfCorruptRecord: de naam van het nieuwe veld waarin onjuist gevormde tekenreeksen worden opgeslagen. Standaard is _corrupt_record.
    • attributePrefix: het voorvoegsel voor kenmerken, zodat kenmerken en elementen kunnen worden onderscheiden. Dit is het voorvoegsel voor veldnamen. Standaard is _.
    • valueTag: De tag die wordt gebruikt voor de waarde wanneer er kenmerken zijn in een element dat geen onderliggende elementen bevat. Standaard is _VALUE.
    • charset: standaard ingesteld op UTF-8, maar kan worden ingesteld op andere geldige tekensetnamen.
    • ignoreSurroundingSpaces: Of witruimten rondom waarden al dan niet moeten worden overgeslagen. Standaard ingesteld op onwaar.
    • rowValidationXSDPath: Pad naar een XSD-bestand dat wordt gebruikt om de XML voor elke rij te valideren. Rijen die niet kunnen worden gevalideerd, worden behandeld als parseringsfouten zoals hierboven. De XSD heeft verder geen invloed op het opgegeven of afgeleid schema. Als hetzelfde lokale pad nog niet zichtbaar is op de uitvoerders in het cluster, moeten de XSD en eventuele andere personen waaraan het afhankelijk is, worden toegevoegd aan de Spark-uitvoerders met SparkContext.addFile. In dit geval kunt u lokale XSD /foo/bar.xsdgebruiken, aanroepen addFile("/foo/bar.xsd") en doorgeven "bar.xsd" als rowValidationXSDPath.
  • Schrijven
    • path: Locatie voor het schrijven van bestanden.
    • rowTag: De rijtag die moet worden behandeld als een rij. In deze XML <books><book><book>...</books>zou de waarde bijvoorbeeld zijn book. Standaard is ROW.
    • rootTag: de hoofdtag die moet worden behandeld als de hoofdmap. In deze XML <books><book><book>...</books>zou de waarde bijvoorbeeld zijn books. Standaard is ROWS.
    • nullValue: de waarde die moet worden geschreven null . De standaardwaarde is de tekenreeks "null". Wanneer "null", schrijft het geen kenmerken en elementen voor velden.
    • attributePrefix: het voorvoegsel voor kenmerken om kenmerken en elementen te onderscheiden. Dit is het voorvoegsel voor veldnamen. Standaard is _.
    • valueTag: De tag die wordt gebruikt voor de waarde wanneer er kenmerken zijn in een element dat geen onderliggende elementen bevat. Standaard is _VALUE.
    • compression: Compressiecodec die moet worden gebruikt bij het opslaan in een bestand. Moet de volledig gekwalificeerde naam zijn van een klasse die wordt geïmplementeerd org.apache.hadoop.io.compress.CompressionCodec of een van niet-hoofdlettergevoelige korte namen (bzip2, gzip, lz4en ).snappy Standaard is geen compressie.

Ondersteunt het verkorte naamgebruik; U kunt xml in plaats van com.databricks.spark.xml.

XSD-ondersteuning

U kunt afzonderlijke rijen valideren op basis van een XSD-schema met behulp van rowValidationXSDPath.

U gebruikt het hulpprogramma com.databricks.spark.xml.util.XSDToSchema om een Spark DataFrame-schema te extraheren uit enkele XSD-bestanden. Het ondersteunt alleen eenvoudige, complexe en reekstypen, alleen eenvoudige XSD-functionaliteit en is experimenteel.

import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths

val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)

Geneste XML parseren

Hoewel het voornamelijk wordt gebruikt om een XML-bestand te converteren naar een DataFrame, kunt u ook de methode from_xml gebruiken om XML te parseren in een kolom met tekenreekswaarden in een bestaand DataFrame en deze als een nieuwe kolom met geparseerde resultaten toe te voegen als een struct met:

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._

val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))

Notitie

  • mode:
    • Als deze optie is ingesteld op PERMISSIVE, wordt de standaardmodus voor parseren standaard ingesteld op DROPMALFORMED. Als u een kolom in het schema opneemt voor from_xml die overeenkomt met de columnNameOfCorruptRecord, worden in PERMISSIVE modus onjuiste records naar die kolom uitgevoerd in de resulterende struct.
    • Als deze optie is ingesteld op DROPMALFORMED, resulteren XML-waarden die niet correct worden geparseerd in een null waarde voor de kolom. Er worden geen rijen verwijderd.
  • from_xml converteert matrices van tekenreeksen die XML bevatten naar matrices met geparseerde structs. Gebruik in plaats daarvan schema_of_xml_array.
  • from_xml_string is een alternatief voor gebruik in UDF's die rechtstreeks op een tekenreeks werkt in plaats van een kolom.

Conversieregels

Vanwege structurele verschillen tussen DataFrames en XML zijn er enkele conversieregels van XML-gegevens naar DataFrame en van DataFrame naar XML-gegevens. U kunt afhandelingskenmerken uitschakelen met de optie excludeAttribute.

XML converteren naar DataFrame

  • Kenmerken: Kenmerken worden geconverteerd als velden met het voorvoegsel dat is opgegeven in de attributePrefix optie. Als attributePrefix dat het is _, is het document

    <one myOneAttrib="AAAA">
        <two>two</two>
        <three>three</three>
    </one>
    

    produceert het schema:

    root
    |-- _myOneAttrib: string (nullable = true)
    |-- two: string (nullable = true)
    |-- three: string (nullable = true)
    
  • Als een element kenmerken heeft maar geen onderliggende elementen, wordt de kenmerkwaarde in een afzonderlijk veld geplaatst dat is opgegeven in de valueTag optie. Als valueTag dat het is _VALUE, is het document

    <one>
        <two myTwoAttrib="BBBBB">two</two>
        <three>three</three>
    </one>
    

    produceert het schema:

    root
    |-- two: struct (nullable = true)
    |    |-- _VALUE: string (nullable = true)
    |    |-- _myTwoAttrib: string (nullable = true)
    |-- three: string (nullable = true)
    

DataFrame converteren naar XML

Het schrijven van een XML-bestand vanuit een DataFrame met een veld ArrayType, waarbij het element ArrayType is, zou een extra genest veld voor dat element hebben. Dit gebeurt niet bij het lezen en schrijven van XML-gegevens, maar bij het schrijven van een DataFrame dat uit andere bronnen is gelezen. Daarom heeft roundtrip in het lezen en schrijven van XML-bestanden dezelfde structuur, maar het schrijven van een DataFrame dat uit andere bronnen wordt gelezen, is mogelijk om een andere structuur te hebben.

Een DataFrame met het schema:

 |-- a: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

en gegevens:

+------------------------------------+
|                                   a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

produceert het XML-bestand:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>