Odczytywanie i zapisywanie plików XML
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
W tym artykule opisano sposób odczytywania i zapisywania plików XML.
Extensible Markup Language (XML) to język znaczników do formatowania, przechowywania i udostępniania danych w formacie tekstowym. Definiuje zestaw reguł serializacji danych, od dokumentów do dowolnych struktur danych.
Natywna obsługa formatu plików XML umożliwia pozyskiwanie, wykonywanie zapytań i analizowanie danych XML na potrzeby przetwarzania wsadowego lub przesyłania strumieniowego. Może automatycznie wnioskować i rozwijać schemat i typy danych, obsługuje wyrażenia SQL, takie jak from_xml
, i może generować dokumenty XML. Nie wymaga on zewnętrznych plików jar i bezproblemowo współpracuje z modułem automatycznego ładowania read_files
i COPY INTO
. Opcjonalnie można zweryfikować każdy rekord XML na poziomie wiersza względem definicji schematu XML (XSD).
Wymagania
Środowisko Databricks Runtime w wersji 14.3 lub nowszej
Analizowanie rekordów XML
Specyfikacja XML nakazuje dobrze sformułowaną strukturę. Jednak ta specyfikacja nie jest natychmiast mapowania na format tabelaryczny. Należy określić rowTag
opcję, aby wskazać element XML, który mapuje na DataFrame
Row
element . Element rowTag
staje się najwyższym poziomem struct
. Elementy podrzędne rowTag
stają się polami najwyższego poziomu struct
.
Możesz określić schemat dla tego rekordu lub zezwolić na automatyczne wnioskowanie. Ponieważ analizator sprawdza rowTag
tylko elementy, odfiltrowane są jednostki DTD i zewnętrzne.
W poniższych przykładach przedstawiono wnioskowanie schematu i analizowanie pliku XML przy użyciu różnych rowTag
opcji:
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
Przeczytaj plik XML z opcją rowTag
"books":
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
Wyjście:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
Przeczytaj plik XML z ciągiem rowTag
"book":
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
Wyjście:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
Opcje źródła danych
Opcje źródła danych dla kodu XML można określić na następujące sposoby:
- Metody
.option/.options
następujących:- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- Następujące wbudowane funkcje:
- Klauzula
OPTIONS
CREATE TABLE USING DATA_SOURCE
Aby uzyskać listę opcji, zobacz Opcje modułu ładującego automatycznego.
Obsługa XSD
Opcjonalnie można zweryfikować każdy rekord XML na poziomie wiersza za pomocą definicji schematu XML (XSD). Plik XSD jest określony w rowValidationXSDPath
opcji . XSD nie ma w inny sposób wpływu na podany lub wywnioskowany schemat. Rekord, który kończy się niepowodzeniem walidacji, jest oznaczony jako "uszkodzony" i obsługiwany na podstawie opcji trybu obsługi uszkodzonych rekordów opisanych w sekcji opcji.
Możesz użyć XSDToSchema
polecenia , aby wyodrębnić schemat ramki danych Spark z pliku XSD. Obsługuje tylko proste, złożone i sekwencyjne typy i obsługuje tylko podstawowe funkcje XSD.
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
W poniższej tabeli przedstawiono konwersję typów danych XSD na typy danych platformy Spark:
Typy danych XSD | Typy danych platformy Spark |
---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short , unsignedByte |
ShortType |
integer , , negativeInteger , nonNegativeInteger , nonPositiveInteger , , positiveInteger unsignedShort |
IntegerType |
long , unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
Analizowanie zagnieżdżonego kodu XML
Dane XML w kolumnie z wartością schema_of_xml
ciągu w istniejącej ramce danych można przeanalizować i from_xml
zwracające schemat i przeanalizowane wyniki jako nowe struct
kolumny. Dane XML przekazywane jako argument do schema_of_xml
i from_xml
muszą być pojedynczym dobrze sformułowanym rekordem XML.
schema_of_xml
Składnia
schema_of_xml(xmlStr [, options] )
Argumenty
xmlStr
: wyrażenie STRING określające pojedynczy poprawnie sformułowany rekord XML.options
: opcjonalnyMAP<STRING,STRING>
literał określający dyrektywy.
Zwroty
Ciąg zawierający definicję struktury z n polami ciągów, w których nazwy kolumn pochodzą z elementów XML i nazw atrybutów. Wartości pól przechowują pochodne sformatowane typy SQL.
from_xml
Składnia
from_xml(xmlStr, schema [, options])
Argumenty
xmlStr
: wyrażenie STRING określające pojedynczy poprawnie sformułowany rekord XML.schema
: wyrażenie STRING lub wywołanieschema_of_xml
funkcji.options
: opcjonalnyMAP<STRING,STRING>
literał określający dyrektywy.
Zwroty
Struktura z nazwami pól i typami pasującymi do definicji schematu. Schemat musi być zdefiniowany jako nazwa kolumny rozdzielone przecinkami i pary typów danych, które są używane na przykład CREATE TABLE
. Większość opcji wyświetlanych w opcjach źródła danych ma zastosowanie z następującymi wyjątkami:
rowTag
: Ponieważ istnieje tylko jeden rekord XML,rowTag
opcja nie ma zastosowania.mode
(ustawienie domyślne:PERMISSIVE
): umożliwia tryb radzenia sobie z uszkodzonymi rekordami podczas analizowania.PERMISSIVE
: Gdy spełnia uszkodzony rekord, umieszcza źle sformułowany ciąg w polu skonfigurowanym przezcolumnNameOfCorruptRecord
program i ustawia źle sformułowane pola nanull
wartość . Aby zachować uszkodzone rekordy, można ustawić pole typu ciągu o nazwiecolumnNameOfCorruptRecord
w schemacie zdefiniowanym przez użytkownika. Jeśli schemat nie ma pola, usuwa uszkodzone rekordy podczas analizowania. Podczas wnioskowania schematu niejawnie dodajecolumnNameOfCorruptRecord
pole w schemacie wyjściowym.FAILFAST
: zgłasza wyjątek, gdy spełnia uszkodzone rekordy.
Konwersja struktury
Ze względu na różnice struktury między ramkami danych i xml istnieją pewne reguły konwersji danych XML do DataFrame
i z DataFrame
danych XML. Należy pamiętać, że atrybuty obsługi można wyłączyć za pomocą opcji excludeAttribute
.
Konwersja z xml na ramkę danych
Atrybuty: Atrybuty są konwertowane jako pola z prefiksem attributePrefix
nagłówka .
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
tworzy poniższy schemat:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
Dane znaków w elemecie zawierającym atrybuty lub elementy podrzędne: są one analizowane w valueTag
polu. Jeśli istnieje wiele wystąpień danych znaków, valueTag
pole jest konwertowane na array
typ.
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
tworzy poniższy schemat:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
Konwersja z ramki danych na XML
Element jako tablica w tablicy: Pisanie pliku XML z DataFrame
użyciem pola ArrayType
z jego elementem, tak jak ArrayType
w przypadku dodatkowego zagnieżdżonego pola dla elementu. Nie dzieje się tak w odczytywaniu i zapisywaniu danych XML, ale zapisaniu DataFrame
odczytu z innych źródeł. W związku z tym dwukierunkowe odczytywanie i zapisywanie plików XML ma taką samą strukturę, ale zapisywanie DataFrame
odczytu z innych źródeł jest możliwe, aby mieć inną strukturę.
Ramka danych ze schematem poniżej:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
i z poniższymi danymi:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
tworzy poniższy plik XML:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
Nazwa elementu tablicy bez nazwy w DataFrame
tablicy jest określona przez opcję arrayElementName
(Wartość domyślna: item
).
Uratowana kolumna danych
Uratowana kolumna danych gwarantuje, że nigdy nie utracisz ani nie przegapisz danych podczas etl. Możesz włączyć uratowaną kolumnę danych, aby przechwycić wszystkie dane, które nie zostały przeanalizowane, ponieważ co najmniej jedno pole w rekordzie ma jeden z następujących problemów:
- Brak podanego schematu
- Nie jest zgodny z typem danych podanego schematu
- Ma niezgodność wielkości liter z nazwami pól w podanym schemacie
Uratowana kolumna danych jest zwracana jako dokument JSON zawierający kolumny, które zostały uratowane, oraz ścieżkę pliku źródłowego rekordu. Aby usunąć ścieżkę pliku źródłowego z uratowanej kolumny danych, możesz ustawić następującą konfigurację SQL:
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
Możesz włączyć uratowaną kolumnę danych, ustawiając opcję rescuedDataColumn
na nazwę kolumny podczas odczytywania danych, na przykład _rescued_data
za pomocą polecenia spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)
.
Analizator XML obsługuje trzy tryby podczas analizowania rekordów: PERMISSIVE
, DROPMALFORMED
i FAILFAST
. W przypadku użycia razem z elementem rescuedDataColumn
niezgodność typów danych nie powoduje porzucenia rekordów w DROPMALFORMED
trybie lub zgłaszania błędu w FAILFAST
trybie. Tylko uszkodzone rekordy (niekompletne lub źle sformułowane XML) są porzucane lub zgłaszane błędy.
Wnioskowanie schematu i ewolucja w narzędziu do automatycznego ładowania
Aby zapoznać się ze szczegółowym omówieniem tego tematu i odpowiednimi opcjami, zobacz Konfigurowanie wnioskowania schematu i ewolucji w module automatycznego ładowania. Możesz skonfigurować moduł automatycznego ładowania w celu automatycznego wykrywania schematu załadowanych danych XML, co umożliwia inicjowanie tabel bez jawnego deklarowania schematu danych i rozwijania schematu tabeli w miarę wprowadzania nowych kolumn. Eliminuje to konieczność ręcznego śledzenia i stosowania zmian schematu w czasie.
Domyślnie wnioskowanie schematu modułu automatycznego ładowania ma na celu uniknięcie problemów z ewolucją schematu z powodu niezgodności typów. W przypadku formatów, które nie kodują typów danych (JSON, CSV i XML), moduł ładujący automatycznie wywnioskuje wszystkie kolumny jako ciągi, w tym pola zagnieżdżone w plikach XML. Platforma Apache Spark DataFrameReader
używa innego zachowania w przypadku wnioskowania schematu, wybierając typy danych dla kolumn w źródłach XML na podstawie przykładowych danych. Aby włączyć to zachowanie za pomocą automatycznego modułu ładującego, ustaw opcję cloudFiles.inferColumnTypes
na true
.
Moduł automatycznego ładowania wykrywa dodanie nowych kolumn podczas przetwarzania danych. Gdy funkcja automatycznego ładowania wykryje nową kolumnę, strumień zatrzymuje się przy użyciu elementu UnknownFieldException
. Przed zgłoszeniem tego błędu strumień automatycznie ładujący wykonuje wnioskowanie schematu na najnowszej mikrosadowej partii danych i aktualizuje lokalizację schematu przy użyciu najnowszego schematu, scalając nowe kolumny na końcu schematu. Typy danych istniejących kolumn pozostają niezmienione. Moduł automatycznego ładowania obsługuje różne tryby ewolucji schematu, które można ustawić w opcji cloudFiles.schemaEvolutionMode
.
Możesz użyć wskazówek schematu, aby wymusić informacje o schemacie, które znasz i których oczekujesz na wywnioskowanym schemacie. Jeśli wiesz, że kolumna ma określony typ danych lub jeśli chcesz wybrać bardziej ogólny typ danych (na przykład podwójne zamiast liczby całkowitej), możesz podać dowolną liczbę wskazówek dla typów danych kolumn jako ciąg przy użyciu składni specyfikacji schematu SQL. Po włączeniu uratowanej kolumny danych pola nazwane w przypadku innym niż schemat są ładowane do kolumny _rescued_data
. To zachowanie można zmienić, ustawiając opcję readerCaseSensitive
na false
, w takim przypadku moduł automatycznego ładowania odczytuje dane w sposób bez uwzględniania wielkości liter.
Przykłady
Przykłady w tej sekcji używają pliku XML dostępnego do pobrania w repozytorium GitHub platformy Apache Spark.
Odczytywanie i zapisywanie kodu XML
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
Schemat można określić ręcznie podczas odczytywania danych:
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
Interfejs API SQL
Źródło danych XML może wnioskować typy danych:
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
Można również określić nazwy i typy kolumn w języku DDL. W takim przypadku schemat nie jest automatycznie wnioskowany.
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
Ładowanie kodu XML przy użyciu FUNKCJI COPY INTO
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
Odczytywanie kodu XML z walidacją wierszy
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
Analizowanie zagnieżdżonego kodu XML (from_xml i schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
from_xml i schema_of_xml przy użyciu interfejsu API SQL
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
Ładowanie kodu XML za pomocą modułu ładującego automatycznego
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)
Dodatkowe zasoby
Odczytywanie i zapisywanie danych XML przy użyciu biblioteki spark-xml