Udostępnij za pośrednictwem


Odczytywanie i zapisywanie danych XML przy użyciu biblioteki spark-xml

Ważne

Ta dokumentacja została wycofana i może nie zostać zaktualizowana. Produkty, usługi lub technologie wymienione w treści nie są oficjalnie zatwierdzane ani testowane przez usługę Databricks.

Natywna obsługa formatu pliku XML jest dostępna jako publiczna wersja zapoznawcza. Zobacz Odczytywanie i zapisywanie plików XML.

W tym artykule opisano sposób odczytywania i zapisywania pliku XML jako źródła danych platformy Apache Spark.

Wymagania

  1. Utwórz bibliotekę spark-xml jako bibliotekę Maven. Dla współrzędnych Maven określ:

    • Środowisko Databricks Runtime 7.x i nowsze: com.databricks:spark-xml_2.12:<release>

    Zobacz spark-xml Wydania dla najnowszej wersji programu <release>.

  2. Zainstaluj bibliotekę w klastrze.

Przykład

W przykładzie w tej sekcji jest używany plik XML książek .

  1. Pobierz plik XML książek:

    $ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
    
  2. Przekaż plik do systemu plików DBFS.

Odczytywanie i zapisywanie danych XML

SQL

/*Infer schema*/

CREATE TABLE books
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

/*Specify column names and types*/

CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING xml
OPTIONS (path "dbfs:/books.xml", rowTag "book")

Scala

// Infer schema

import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method

val df = spark.read
  .option("rowTag", "book")
  .xml("dbfs:/books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

// Specify schema

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))

val df = spark.read
  .option("rowTag", "book")
  .schema(customSchema)
  .xml("books.xml")

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("dbfs:/newbooks.xml")

R

# Infer schema

library(SparkR)

sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))

df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")

# Default `rootTag` and `rowTag`
write.df(df, "dbfs:/newbooks.xml", "xml")

# Specify schema

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")

# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")

Opcje

  • Czytać
    • path: lokalizacja plików XML. Akceptuje standardowe wyrażenia globbing platformy Hadoop.
    • rowTag: tag wiersza, który ma być traktowany jako wiersz. Na przykład w tym pliku XML <books><book><book>...</books>wartość to book. Wartość domyślna to ROW.
    • samplingRatio: Współczynnik próbkowania dla schematu wnioskowania (0,0 ~ 1). Wartość domyślna to 1. Możliwe typy to StructType, , StringTypeDoubleTypeLongTypeBooleanTypeArrayTypeTimestampType , i NullType, chyba że podajesz schemat.
    • excludeAttribute: czy wykluczać atrybuty w elementach. Wartość domyślna to „fałsz”.
    • nullValue: wartość, która ma być traktowana null jako wartość. Wartość domyślna to "".
    • mode: tryb radzenia sobie z uszkodzonymi rekordami. Wartość domyślna to PERMISSIVE.
      • PERMISSIVE:
        • Gdy napotka uszkodzony rekord, ustawia wszystkie pola na null i umieszcza źle sformułowany ciąg w nowym polu skonfigurowanym przez columnNameOfCorruptRecordprogram .
        • W przypadku napotkania pola nieprawidłowego typu danych ustawia pole obraźliwe na nullwartość .
      • DROPMALFORMED: ignoruje uszkodzone rekordy.
      • FAILFAST: zgłasza wyjątek podczas wykrywania uszkodzonych rekordów.
    • inferSchema: jeśli trueprogram próbuje wywnioskować odpowiedni typ dla każdej wynikowej kolumny DataFrame, na przykład typu wartości logicznej, liczbowej lub daty. Jeśli falsewszystkie wynikowe kolumny mają typ ciągu. Wartość domyślna to true.
    • columnNameOfCorruptRecord: nazwa nowego pola, w którym są przechowywane źle sformułowane ciągi. Wartość domyślna to _corrupt_record.
    • attributePrefix: prefiks atrybutów, aby rozróżniać atrybuty i elementy. Jest to prefiks nazw pól. Wartość domyślna to _.
    • valueTag: tag używany dla wartości, gdy istnieją atrybuty w elemecie, który nie ma elementów podrzędnych. Wartość domyślna to _VALUE.
    • charset: wartości domyślne, UTF-8 ale można ustawić na inne prawidłowe nazwy znaków.
    • ignoreSurroundingSpaces: Czy należy pominąć odstępy otaczające wartości. Wartość domyślna to „fałsz”.
    • rowValidationXSDPath: ścieżka do pliku XSD używanego do sprawdzania poprawności kodu XML dla każdego wiersza. Wiersze, które nie mogą sprawdzić poprawności, są traktowane jak błędy analizy, jak powyżej. XSD nie ma w inny sposób wpływu na podany lub wywnioskowany schemat. Jeśli ta sama ścieżka lokalna nie jest jeszcze widoczna w funkcjach wykonawczych w klastrze, to XSD i inne, od których zależy, powinny zostać dodane do funkcji wykonawczych platformy Spark za pomocą elementu SparkContext.addFile. W tym przypadku, aby użyć lokalnego XSD /foo/bar.xsd, wywołaj addFile("/foo/bar.xsd") i przekaż "bar.xsd" jako rowValidationXSDPath.
  • Pisać
    • path: lokalizacja do zapisu plików.
    • rowTag: tag wiersza, który ma być traktowany jako wiersz. Na przykład w tym pliku XML <books><book><book>...</books>wartość to book. Wartość domyślna to ROW.
    • rootTag: tag główny, który ma być traktowany jako katalog główny. Na przykład w tym pliku XML <books><book><book>...</books>wartość to books. Wartość domyślna to ROWS.
    • nullValue: wartość do zapisania null wartości. Wartość domyślna to ciąg "null". Gdy "null"parametr nie zapisuje atrybutów i elementów dla pól.
    • attributePrefix: prefiks atrybutów do rozróżniania atrybutów i elementów. Jest to prefiks nazw pól. Wartość domyślna to _.
    • valueTag: tag używany dla wartości, gdy istnieją atrybuty w elemecie, który nie ma elementów podrzędnych. Wartość domyślna to _VALUE.
    • compression: koder koder-dekoder kompresji używany podczas zapisywania w pliku. Powinna być w pełni kwalifikowaną nazwą klasy implementowania org.apache.hadoop.io.compress.CompressionCodec lub jedną z nazw krótkich bez uwzględniania wielkości liter (bzip2, gzip, lz4i snappy). Wartość domyślna nie jest kompresją.

Obsługuje skrócone użycie nazwy; Możesz użyć xml zamiast com.databricks.spark.xml.

Obsługa XSD

Poszczególne wiersze można zweryfikować względem schematu XSD przy użyciu polecenia rowValidationXSDPath.

Narzędzie służy com.databricks.spark.xml.util.XSDToSchema do wyodrębniania schematu ramki danych Platformy Spark z niektórych plików XSD. Obsługuje tylko proste, złożone i sekwencyjne typy, tylko podstawowe funkcje XSD i jest eksperymentalne.

import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths

val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)

Analizowanie zagnieżdżonego kodu XML

Mimo że służy głównie do konwertowania pliku XML na ramkę danych, można również użyć from_xml metody do analizowania kodu XML w kolumnie wartości ciągu w istniejącej ramce danych i dodać ją jako nową kolumnę z przeanalizowanymi wynikami jako strukturą:

import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._

val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))

Uwaga

  • mode:
    • Jeśli ustawiono PERMISSIVEwartość , domyślny tryb analizy jest domyślnie ustawiony na DROPMALFORMED. Jeśli dołączysz kolumnę do schematu, from_xml która jest zgodna z parametrem columnNameOfCorruptRecord, PERMISSIVE tryb zwraca źle sformułowane rekordy do tej kolumny w wynikowej struktury.
    • Jeśli ustawiono DROPMALFORMEDwartość , wartości XML, które nie analizują poprawnie, powodują null wyświetlenie wartości dla kolumny. Wiersze nie są porzucane.
  • from_xml Konwertuje tablice ciągów zawierających kod XML na tablice analizowanych struktur. Użycie w zamian parametru schema_of_xml_array.
  • from_xml_string jest alternatywą do użycia w funkcjach zdefiniowanych przez użytkownika, które działają bezpośrednio w ciągu zamiast kolumny.

Reguły konwersji

Ze względu na różnice strukturalne między ramkami danych i xml istnieją pewne reguły konwersji z danych XML do ramki danych i z ramki danych XML do danych XML. Można wyłączyć obsługę atrybutów za pomocą opcji excludeAttribute.

Konwertowanie kodu XML na ramkę danych

  • Atrybuty: atrybuty są konwertowane jako pola z prefiksem określonym w attributePrefix opcji. Jeśli attributePrefix element to _, dokument

    <one myOneAttrib="AAAA">
        <two>two</two>
        <three>three</three>
    </one>
    

    tworzy schemat:

    root
    |-- _myOneAttrib: string (nullable = true)
    |-- two: string (nullable = true)
    |-- three: string (nullable = true)
    
  • Jeśli element ma atrybuty, ale nie elementy podrzędne, wartość atrybutu jest umieszczana w osobnym polu określonym w valueTag opcji. Jeśli valueTag element to _VALUE, dokument

    <one>
        <two myTwoAttrib="BBBBB">two</two>
        <three>three</three>
    </one>
    

    tworzy schemat:

    root
    |-- two: struct (nullable = true)
    |    |-- _VALUE: string (nullable = true)
    |    |-- _myTwoAttrib: string (nullable = true)
    |-- three: string (nullable = true)
    

Konwertowanie ramki danych na xml

Pisanie pliku XML z ramki danych o polu ArrayType z jego elementem, tak jak ArrayType w przypadku dodatkowego zagnieżdżonego pola dla elementu. Nie dzieje się tak w odczytywaniu i zapisywaniu danych XML, ale podczas zapisywania ramki danych odczytanej z innych źródeł. W związku z tym dwukierunkowe odczytywanie i zapisywanie plików XML ma taką samą strukturę, ale zapisywanie ramki danych odczytanej z innych źródeł jest możliwe, aby mieć inną strukturę.

Ramka danych ze schematem:

 |-- a: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

i dane:

+------------------------------------+
|                                   a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

tworzy plik XML:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>