Udostępnij za pośrednictwem


Odczytywanie i zapisywanie plików XML

Ważne

Ta funkcja jest dostępna w publicznej wersji zapoznawczej.

W tym artykule opisano sposób odczytywania i zapisywania plików XML.

Extensible Markup Language (XML) to język znaczników do formatowania, przechowywania i udostępniania danych w formacie tekstowym. Definiuje set reguł serializacji danych od dokumentów do dowolnych struktur danych.

Natywna obsługa formatu plików XML umożliwia pozyskiwanie, wykonywanie zapytań i analizowanie danych XML na potrzeby przetwarzania wsadowego lub przesyłania strumieniowego. Może automatycznie wnioskować i rozwijać schema i typy danych, obsługuje wyrażenia SQL, takie jak from_xml, i może generate dokumenty XML. Nie wymaga on zewnętrznych plików jar i bezproblemowo współpracuje z modułem automatycznego ładowania read_files i COPY INTO. Opcjonalnie można zweryfikować każdy rekord XML na poziomie wiersza względem definicji Schema XML (XSD).

Wymagania

Środowisko Databricks Runtime w wersji 14.3 lub nowszej

Analizowanie rekordów XML

Specyfikacja XML nakazuje dobrze sformułowaną strukturę. Jednak ta specyfikacja nie jest natychmiast mapowania na format tabelaryczny. Należy określić rowTag opcję, aby wskazać element XML, który mapuje na DataFrameRowelement . Element rowTag staje się najwyższym poziomem struct. Elementy podrzędne rowTag stają się polami najwyższego poziomu struct.

Możesz określić schema dla tego rekordu lub zezwolić na automatyczne wnioskowanie. Ponieważ analizator sprawdza rowTag tylko elementy, odfiltrowane są jednostki DTD i zewnętrzne.

W poniższych przykładach przedstawiono schema wnioskowanie i analizowanie pliku XML przy użyciu różnych opcji rowTag:

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

Przeczytaj plik XML z opcją rowTag "books":

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

Wyjście:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

Przeczytaj plik XML z ciągiem rowTag "book":

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

Wyjście:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

Opcje źródła danych

Opcje źródła danych dla kodu XML można określić na następujące sposoby:

Aby uzyskać listę list opcji, zobacz Opcje automatycznego ładownika.

Obsługa XSD

Opcjonalnie można zweryfikować każdy rekord XML na poziomie wiersza za pomocą definicji Schema XML (XSD). Plik XSD jest określony w rowValidationXSDPath opcji . XSD nie ma w żaden inny sposób wpływu na schema podane lub wnioskowane. Rekord, który kończy się niepowodzeniem walidacji, jest oznaczony jako "uszkodzony" i obsługiwany na podstawie opcji trybu obsługi uszkodzonych rekordów opisanych w sekcji opcji.

Za pomocą XSDToSchema można wyodrębnić schema ramki danych spark z pliku XSD. Obsługuje tylko proste, złożone i sekwencyjne typy i obsługuje tylko podstawowe funkcje XSD.

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

Poniższy table przedstawia konwersję typów danych XSD na typy danych Spark.

Typy danych XSD Typy danych platformy Spark
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer, , negativeInteger, nonNegativeInteger, nonPositiveInteger, , positiveIntegerunsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

Analizowanie zagnieżdżonego kodu XML

Dane XML w column wartości ciągu w istniejącej ramce danych mogą być analizowane przy użyciu schema_of_xml i from_xml, które zwracają schema i przeanalizowane wyniki jako nowe structcolumns. Dane XML przekazywane jako argument do schema_of_xml i from_xml muszą być pojedynczym dobrze sformułowanym rekordem XML.

schema_of_xml

Składnia

schema_of_xml(xmlStr [, options] )

Argumenty

  • xmlStr: wyrażenie STRING określające pojedynczy poprawnie sformułowany rekord XML.
  • options: opcjonalny MAP<STRING,STRING> literał określający dyrektywy.

Zwroty

Ciąg zawierający definicję struktury z n polami stringów, w których nazwy where i column pochodzą z nazw elementów i atrybutów XML. Pole values przechowuje pochodne i sformatowane typy SQL.

from_xml

Składnia

from_xml(xmlStr, schema [, options])

Argumenty

  • xmlStr: wyrażenie STRING określające pojedynczy poprawnie sformułowany rekord XML.
  • schema: wyrażenie STRING lub wywołanie schema_of_xml funkcji.
  • options: opcjonalny MAP<STRING,STRING> literał określający dyrektywy.

Zwroty

Struktura z nazwami pól i typami pasującymi do definicji schema. Schema należy zdefiniować jako pary nazw column i typów danych rozdzielane przecinkami, na przykład CREATE TABLE. Większość opcji wyświetlanych w opcjach źródła danych ma zastosowanie z następującymi wyjątkami:

  • rowTag: Ponieważ istnieje tylko jeden rekord XML, rowTag opcja nie ma zastosowania.
  • mode (ustawienie domyślne: PERMISSIVE): umożliwia tryb radzenia sobie z uszkodzonymi rekordami podczas analizowania.
    • PERMISSIVE: Gdy spełnia uszkodzony rekord, umieszcza źle sformułowany ciąg w polu skonfigurowanym przez columnNameOfCorruptRecordprogram i ustawia źle sformułowane pola na nullwartość . Aby zachować uszkodzone rekordy, można set pole typu ciągu o nazwie columnNameOfCorruptRecord w schemazdefiniowanym przez użytkownika. Jeśli schema nie ma pola, usuwa uszkodzone rekordy podczas parsowania. Podczas wnioskowania schema, niejawnie dodaje się pole columnNameOfCorruptRecord do danych wyjściowych schema.
    • FAILFAST: zgłasza wyjątek, gdy spełnia uszkodzone rekordy.

Konwersja struktury

Ze względu na różnice struktury między ramkami danych i xml istnieją pewne reguły konwersji danych XML do DataFrame i z DataFrame danych XML. Należy pamiętać, że atrybuty obsługi można wyłączyć za pomocą opcji excludeAttribute.

Konwersja z xml na ramkę danych

Atrybuty: Atrybuty są konwertowane jako pola z prefiksem attributePrefixnagłówka .

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

tworzy schema poniżej:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

Dane znaków w elemecie zawierającym atrybuty lub elementy podrzędne: są one analizowane w valueTag polu. Jeśli istnieje wiele wystąpień danych znaków, valueTag pole jest konwertowane na array typ.

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

tworzy schema poniżej:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

Konwersja z ramki danych na XML

Element jako tablica w tablicy: Pisanie pliku XML z DataFramehavingArrayType pola z jego elementem jako ArrayType będzie miało dodatkowe zagnieżdżone pole dla elementu. Nie dzieje się tak w odczytywaniu i zapisywaniu danych XML, ale zapisaniu DataFrame odczytu z innych źródeł. W związku z tym dwukierunkowe odczytywanie i zapisywanie plików XML ma taką samą strukturę, ale zapisywanie DataFrame odczytu z innych źródeł jest możliwe, aby mieć inną strukturę.

Ramka danych z schema poniżej:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

i z poniższymi danymi:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

tworzy poniższy plik XML:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

Nazwa elementu tablicy bez nazwy w DataFrame tablicy jest określona przez opcję arrayElementName (Wartość domyślna: item).

Uratowane dane column

Uratowane dane column zapewniają, że nigdy nie utracisz ani nie przegapisz danych podczas ETL. Możesz włączyć odzyskane dane column, aby przechwycić wszystkie dane, które nie zostały przeanalizowane, ponieważ co najmniej jedno pole w rekordzie ma jeden z następujących problemów:

  • Nieobecny w podanym schema
  • Nie jest zgodny z typem danych podanego schema
  • Ma niezgodność wielkości liter z nazwami pól w podanym schema

Uratowane dane column są zwracane jako dokument JSON zawierający columns, które zostały uratowane, oraz ścieżkę pliku źródłowego rekordu. Aby remove z uratowanych danych columnścieżkę pliku źródłowego, możesz set użyć następującej konfiguracji SQL:

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

Możesz umożliwić dostęp do uratowanych danych column, ustawiając opcję rescuedDataColumn na nazwę column podczas odczytywania danych, takich jak _rescued_data z wykorzystaniem spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>).

Analizator XML obsługuje trzy tryby podczas analizowania rekordów: PERMISSIVE, DROPMALFORMEDi FAILFAST. W przypadku użycia razem z elementem rescuedDataColumnniezgodność typów danych nie powoduje porzucenia rekordów w DROPMALFORMED trybie lub zgłaszania błędu w FAILFAST trybie. Tylko uszkodzone rekordy (niekompletne lub źle sformułowane XML) są porzucane lub zgłaszane błędy.

Schema wnioskowanie i ewolucja w Auto Loaderze

Aby zapoznać się ze szczegółowym omówieniem tego tematu i odpowiednimi opcjami, zobacz Configure inference and evolution in Auto Loader(Konfigurowanie wnioskowania i ewolucji w usłudze Auto Loader). Możesz skonfigurować Auto Loader do automatycznego wykrywania schema załadowanych danych XML, dzięki czemu można zainicjować tables bez jawnego deklarowania schema danych i ewoluować tableschema w miarę wprowadzania nowych columns. Eliminuje to konieczność ręcznego śledzenia i stosowania zmian schema z upływem czasu.

Automatyczny moduł ładujący schema z domyślnym wnioskowaniem ma na celu unikanie problemów schema związanych z ewolucją z powodu niezgodności typów. W przypadku formatów, które nie kodują typów danych (JSON, CSV i XML), moduł ładujący automatycznie wywnioskuje wszystkie columns jako ciągi, w tym zagnieżdżone pola w plikach XML. Usługa Apache Spark DataFrameReader używa innego zachowania do wnioskowania schema, wybierając typy danych dla columns w źródłach XML na podstawie przykładowych danych. Aby włączyć to zachowanie za pomocą Automatycznego ładowania, set opcję cloudFiles.inferColumnTypestrue.

Funkcja automatycznego ładowania wykrywa dodanie nowych columns podczas przetwarzania danych. Gdy moduł automatycznego ładowania wykryje nowy column, strumień zatrzymuje się przy użyciu UnknownFieldException. Przed zgłoszeniem tego błędu, Automatyczny Ładowacz wykonuje schema wnioskowanie na najnowszej mikropartii danych i aktualizuje lokalizację schema za pomocą najnowszych schema przez scalenie nowych columns do końca schema. Typy danych istniejących columns pozostają niezmienione. Moduł automatycznego ładowania obsługuje różne tryby dla schema ewolucji, które set w opcji cloudFiles.schemaEvolutionMode.

Możesz użyć wskazówek schema, aby wymusić informacje schema, które znasz i oczekujesz dla wnioskowanego schema. Jeśli wiesz, że column jest określonym typem danych lub jeśli chcesz wybrać bardziej ogólny typ danych (na przykład podwójne zamiast liczby całkowitej), możesz podać dowolną liczbę wskazówek dla column typów danych jako ciągu przy użyciu składni specyfikacji SQL schema. Po włączeniu odzyskanych danych column pola nazwane w sposób inny niż schema są załadowane do _rescued_datacolumn. To zachowanie można zmienić, ustawiając opcję readerCaseSensitive na false, w takim przypadku moduł automatycznego ładowania odczytuje dane w sposób bez uwzględniania wielkości liter.

Przykłady

Przykłady w tej sekcji używają pliku XML dostępnego do pobrania w repozytorium GitHub platformy Apache Spark.

Odczytywanie i zapisywanie kodu XML

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

Podczas odczytywania danych można ręcznie określić schema:

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

Interfejs API SQL

Źródło danych XML może wnioskować typy danych:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

Można również określić nazwy i typy column w języku DDL. W tym przypadku schema nie jest automatycznie wnioskowane.

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

Ładowanie kodu XML przy użyciu COPY INTO

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

Odczytywanie kodu XML z walidacją wierszy

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

Analizowanie zagnieżdżonego kodu XML (from_xml i schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

from_xml i schema_of_xml przy użyciu interfejsu API SQL

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

Ładowanie kodu XML za pomocą modułu ładującego automatycznego

Python

query = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", True)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(availableNow=True)
  .toTable("table_name")
)

Scala

val query = spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow()
  .toTable("table_name")
  )

Dodatkowe zasoby

Odczytywanie i zapisywanie danych XML przy użyciu biblioteki spark-xml