Odczytywanie i zapisywanie danych XML przy użyciu biblioteki spark-xml
Ważne
Ta dokumentacja została wycofana i może nie zostać zaktualizowana. Produkty, usługi lub technologie wymienione w treści nie są oficjalnie zatwierdzane ani testowane przez usługę Databricks.
Natywna obsługa formatu pliku XML jest dostępna jako publiczna wersja zapoznawcza. Zobacz Odczytywanie i zapisywanie plików XML.
W tym artykule opisano sposób odczytywania i zapisywania pliku XML jako źródła danych platformy Apache Spark.
Wymagania
Utwórz bibliotekę
spark-xml
jako bibliotekę Maven. Dla współrzędnych Maven określ:- Środowisko Databricks Runtime 7.x i nowsze:
com.databricks:spark-xml_2.12:<release>
Zobacz
spark-xml
Wydania dla najnowszej wersji programu<release>
.- Środowisko Databricks Runtime 7.x i nowsze:
Zainstaluj bibliotekę w klastrze.
Przykład
W przykładzie w tej sekcji jest używany plik XML książek .
Pobierz plik XML książek:
$ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
Przekaż plik do systemu plików DBFS.
Odczytywanie i zapisywanie danych XML
SQL
/*Infer schema*/
CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
/*Specify column names and types*/
CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")
Scala
// Infer schema
import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method
val df = spark.read
.option("rowTag", "book")
.xml("dbfs:/books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
// Specify schema
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read
.option("rowTag", "book")
.schema(customSchema)
.xml("books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("dbfs:/newbooks.xml")
R
# Infer schema
library(SparkR)
sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))
df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")
# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")
# Specify schema
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")
Opcje
- Czytać
path
: lokalizacja plików XML. Akceptuje standardowe wyrażenia globbing platformy Hadoop.rowTag
: tag wiersza, który ma być traktowany jako wiersz. Na przykład w tym pliku XML<books><book><book>...</books>
wartość tobook
. Wartość domyślna toROW
.samplingRatio
: Współczynnik próbkowania dla schematu wnioskowania (0,0 ~ 1). Wartość domyślna to 1. Możliwe typy toStructType
, ,StringType
DoubleType
LongType
BooleanType
ArrayType
TimestampType
, iNullType
, chyba że podajesz schemat.excludeAttribute
: czy wykluczać atrybuty w elementach. Wartość domyślna to „fałsz”.nullValue
: wartość, która ma być traktowananull
jako wartość. Wartość domyślna to""
.mode
: tryb radzenia sobie z uszkodzonymi rekordami. Wartość domyślna toPERMISSIVE
.PERMISSIVE
:- Gdy napotka uszkodzony rekord, ustawia wszystkie pola na
null
i umieszcza źle sformułowany ciąg w nowym polu skonfigurowanym przezcolumnNameOfCorruptRecord
program . - W przypadku napotkania pola nieprawidłowego typu danych ustawia pole obraźliwe na
null
wartość .
- Gdy napotka uszkodzony rekord, ustawia wszystkie pola na
DROPMALFORMED
: ignoruje uszkodzone rekordy.FAILFAST
: zgłasza wyjątek podczas wykrywania uszkodzonych rekordów.
inferSchema
: jeślitrue
program próbuje wywnioskować odpowiedni typ dla każdej wynikowej kolumny DataFrame, na przykład typu wartości logicznej, liczbowej lub daty. Jeślifalse
wszystkie wynikowe kolumny mają typ ciągu. Wartość domyślna totrue
.columnNameOfCorruptRecord
: nazwa nowego pola, w którym są przechowywane źle sformułowane ciągi. Wartość domyślna to_corrupt_record
.attributePrefix
: prefiks atrybutów, aby rozróżniać atrybuty i elementy. Jest to prefiks nazw pól. Wartość domyślna to_
.valueTag
: tag używany dla wartości, gdy istnieją atrybuty w elemecie, który nie ma elementów podrzędnych. Wartość domyślna to_VALUE
.charset
: wartości domyślne,UTF-8
ale można ustawić na inne prawidłowe nazwy znaków.ignoreSurroundingSpaces
: Czy należy pominąć odstępy otaczające wartości. Wartość domyślna to „fałsz”.rowValidationXSDPath
: ścieżka do pliku XSD używanego do sprawdzania poprawności kodu XML dla każdego wiersza. Wiersze, które nie mogą sprawdzić poprawności, są traktowane jak błędy analizy, jak powyżej. XSD nie ma w inny sposób wpływu na podany lub wywnioskowany schemat. Jeśli ta sama ścieżka lokalna nie jest jeszcze widoczna w funkcjach wykonawczych w klastrze, to XSD i inne, od których zależy, powinny zostać dodane do funkcji wykonawczych platformy Spark za pomocą elementu SparkContext.addFile. W tym przypadku, aby użyć lokalnego XSD/foo/bar.xsd
, wywołajaddFile("/foo/bar.xsd")
i przekaż"bar.xsd"
jakorowValidationXSDPath
.
- Pisać
path
: lokalizacja do zapisu plików.rowTag
: tag wiersza, który ma być traktowany jako wiersz. Na przykład w tym pliku XML<books><book><book>...</books>
wartość tobook
. Wartość domyślna toROW
.rootTag
: tag główny, który ma być traktowany jako katalog główny. Na przykład w tym pliku XML<books><book><book>...</books>
wartość tobooks
. Wartość domyślna toROWS
.nullValue
: wartość do zapisanianull
wartości. Wartość domyślna to ciąg"null"
. Gdy"null"
parametr nie zapisuje atrybutów i elementów dla pól.attributePrefix
: prefiks atrybutów do rozróżniania atrybutów i elementów. Jest to prefiks nazw pól. Wartość domyślna to_
.valueTag
: tag używany dla wartości, gdy istnieją atrybuty w elemecie, który nie ma elementów podrzędnych. Wartość domyślna to_VALUE
.compression
: koder koder-dekoder kompresji używany podczas zapisywania w pliku. Powinna być w pełni kwalifikowaną nazwą klasy implementowaniaorg.apache.hadoop.io.compress.CompressionCodec
lub jedną z nazw krótkich bez uwzględniania wielkości liter (bzip2
,gzip
,lz4
isnappy
). Wartość domyślna nie jest kompresją.
Obsługuje skrócone użycie nazwy; Możesz użyć xml
zamiast com.databricks.spark.xml
.
Obsługa XSD
Poszczególne wiersze można zweryfikować względem schematu XSD przy użyciu polecenia rowValidationXSDPath
.
Narzędzie służy com.databricks.spark.xml.util.XSDToSchema
do wyodrębniania schematu ramki danych Platformy Spark z niektórych plików XSD. Obsługuje tylko proste, złożone i sekwencyjne typy, tylko podstawowe funkcje XSD i jest eksperymentalne.
import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths
val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)
Analizowanie zagnieżdżonego kodu XML
Mimo że służy głównie do konwertowania pliku XML na ramkę danych, można również użyć from_xml
metody do analizowania kodu XML w kolumnie wartości ciągu w istniejącej ramce danych i dodać ją jako nową kolumnę z przeanalizowanymi wynikami jako strukturą:
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
Uwaga
mode
:- Jeśli ustawiono
PERMISSIVE
wartość , domyślny tryb analizy jest domyślnie ustawiony naDROPMALFORMED
. Jeśli dołączysz kolumnę do schematu,from_xml
która jest zgodna z parametremcolumnNameOfCorruptRecord
,PERMISSIVE
tryb zwraca źle sformułowane rekordy do tej kolumny w wynikowej struktury. - Jeśli ustawiono
DROPMALFORMED
wartość , wartości XML, które nie analizują poprawnie, powodująnull
wyświetlenie wartości dla kolumny. Wiersze nie są porzucane.
- Jeśli ustawiono
from_xml
Konwertuje tablice ciągów zawierających kod XML na tablice analizowanych struktur. Użycie w zamian parametruschema_of_xml_array
.from_xml_string
jest alternatywą do użycia w funkcjach zdefiniowanych przez użytkownika, które działają bezpośrednio w ciągu zamiast kolumny.
Reguły konwersji
Ze względu na różnice strukturalne między ramkami danych i xml istnieją pewne reguły konwersji z danych XML do ramki danych i z ramki danych XML do danych XML. Można wyłączyć obsługę atrybutów za pomocą opcji excludeAttribute
.
Konwertowanie kodu XML na ramkę danych
Atrybuty: atrybuty są konwertowane jako pola z prefiksem określonym w
attributePrefix
opcji. JeśliattributePrefix
element to_
, dokument<one myOneAttrib="AAAA"> <two>two</two> <three>three</three> </one>
tworzy schemat:
root |-- _myOneAttrib: string (nullable = true) |-- two: string (nullable = true) |-- three: string (nullable = true)
Jeśli element ma atrybuty, ale nie elementy podrzędne, wartość atrybutu jest umieszczana w osobnym polu określonym w
valueTag
opcji. JeślivalueTag
element to_VALUE
, dokument<one> <two myTwoAttrib="BBBBB">two</two> <three>three</three> </one>
tworzy schemat:
root |-- two: struct (nullable = true) | |-- _VALUE: string (nullable = true) | |-- _myTwoAttrib: string (nullable = true) |-- three: string (nullable = true)
Konwertowanie ramki danych na xml
Pisanie pliku XML z ramki danych o polu ArrayType
z jego elementem, tak jak ArrayType
w przypadku dodatkowego zagnieżdżonego pola dla elementu. Nie dzieje się tak w odczytywaniu i zapisywaniu danych XML, ale podczas zapisywania ramki danych odczytanej z innych źródeł. W związku z tym dwukierunkowe odczytywanie i zapisywanie plików XML ma taką samą strukturę, ale zapisywanie ramki danych odczytanej z innych źródeł jest możliwe, aby mieć inną strukturę.
Ramka danych ze schematem:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
i dane:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
tworzy plik XML:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>