Udostępnij za pośrednictwem


Plik Avro

Apache Avro to system serializacji danych. Firma Avro zapewnia:

  • Rozbudowane struktury danych.
  • Kompaktowy, szybki, binarny format danych.
  • Plik kontenera do przechowywania trwałych danych.
  • Zdalne wywołanie procedury (RPC).
  • Prosta integracja z językami dynamicznymi. Generowanie kodu nie jest wymagane do odczytywania ani zapisywania plików danych ani używania ani implementowania protokołów RPC. Generowanie kodu jako opcjonalna optymalizacja, warto zaimplementować tylko w przypadku języków statycznie typiowanych.

Źródło danych Avro obsługuje:

  • Konwersja schematu: automatyczna konwersja między rekordami Apache Spark SQL i Avro.
  • Partycjonowanie: łatwe odczytywanie i zapisywanie partycjonowanych danych bez dodatkowej konfiguracji.
  • Kompresja: kompresja używana podczas zapisywania avro na dysku. Obsługiwane typy to uncompressed, snappyi deflate. Można również określić poziom deflacji.
  • Nazwy rekordów: nazwa rekordu i przestrzeń nazw rekordów poprzez przekazanie mapy parametrów za pomocą recordName i recordNamespace.

Zobacz również Odczytywanie i zapisywanie strumieniowych danych Avro.

Konfigurowanie

Zachowanie źródła danych Avro można zmienić przy użyciu różnych parametrów konfiguracji.

Aby zignorować pliki bez rozszerzenia .avro podczas odczytywania, można ustawić parametr avro.mapred.ignore.inputs.without.extension w konfiguracji usługi Hadoop. Wartość domyślna to false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Aby skonfigurować kompresję podczas pisania, ustaw następujące właściwości platformy Spark:

  • Kodek kompresji: spark.sql.avro.compression.codec. Obsługiwane koderki to snappy i deflate. Domyślny koder koderowy to snappy.
  • Jeśli koder kompresji jest deflate, można ustawić poziom kompresji na: spark.sql.avro.deflate.level. Domyślnym poziomem jest -1.

Te właściwości można ustawić w konfiguracji klastra Spark lub w czasie wykonywania przy użyciu spark.conf.set(). Na przykład:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

W przypadku Databricks Runtime 9.1 LTS i nowszych można zmienić domyślne zachowanie wnioskowania schematu w Avro, podając opcję mergeSchema przy odczycie plików. Ustawienie mergeSchema na true spowoduje wnioskowanie schematu z zestawu plików Avro w katalogu docelowym i scalanie ich, a nie wnioskowanie schematu odczytu z jednego pliku.

Obsługiwane typy dla Avro — konwersja Spark SQL

Ta biblioteka obsługuje odczyt wszystkich typów danych Avro. Używa następującego mapowania z typów Avro na typy Spark SQL:

Typ Avro Typ Spark SQL
boolean Typ logiczny
int Typ Integer
długi LongType
liczba zmiennoprzecinkowa FloatType
double DoubleType
B Typ Binarny
ciąg znaków StringType
rekord Typ struktury
enum StringType
tablica ArrayType
mapa Typ mapy
naprawiony Typ binarny
unia Zobacz typy unii.

Typy unii

Źródło danych Avro obsługuje odczyt typów union. Avro uznaje następujące trzy typy za union typy:

  • union(int, long) odnosi się do LongType.
  • union(float, double) mapuje na DoubleType.
  • union(something, null), gdzie something jest dowolnym obsługiwanym typem Avro. To mapuje na ten sam typ Spark SQL, co something, z nullable ustawionym na true.

Wszystkie inne union typy są typami złożonymi. Mapuje się je na StructType, gdzie nazwy pól to member0, member1itd., zgodnie z członkami union. Jest to zgodne z zachowaniem podczas konwersji między Avro a Parquet.

Typy logiczne

Źródło danych Avro obsługuje odczytywanie następujących typów logicznych Avro:

Typ logiczny Avro Typ Avro Typ Spark SQL
data int Typ daty
znacznik czasu-w milisekundach długi TimestampType
znacznik czasu-mikrosekundy długi TimestampType
dziesiętny naprawiony Typ dziesiętny
dziesiętny B Typ dziesiętny

Uwaga

Źródło danych Avro ignoruje dokumenty, aliasy i inne właściwości obecne w pliku Avro.

Obsługiwane typy dla obsługi Spark SQL —> konwersja Avro

Ta biblioteka obsługuje pisanie wszystkich typów spark SQL w usłudze Avro. W przypadku większości typów mapowanie z typów platformy Spark na typy Avro jest proste (na przykład IntegerType jest konwertowane na int); poniżej znajduje się lista kilku przypadków specjalnych:

Typ Spark SQL Typ Avro Typ logiczny Avro
Typ bajtu int
ShortType int
BinaryType B
Typ liczby dziesiętnej stały dziesiętny
Typ znacznika czasu długi znacznik czasu-mikrosekundy
Typ daty int data

Można również określić cały schemat danych wyjściowych Avro z opcją avroSchema, aby typy Spark SQL można przekonwertować na inne typy Avro. Następujące konwersje nie są stosowane domyślnie i wymagają określonego przez użytkownika schematu Avro:

Typ Spark SQL Typ Avro Typ logiczny Avro
Typ bajtu stały
StringType wyliczenie
Typ dziesiętny B dziesiętny
TypZnacznikaCzasu długo timestamp-millis

Przykłady

W tych przykładach użyto pliku episodes.avro .

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

W tym przykładzie pokazano niestandardowy schemat Avro:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

W tym przykładzie przedstawiono opcje kompresji Avro:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

W tym przykładzie pokazano partycjonowane rekordy Avro:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

W tym przykładzie przedstawiono nazwę rekordu i przestrzeń nazw:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Aby wysłać zapytanie do danych Avro w programie SQL, zarejestruj plik danych jako tabelę lub widok tymczasowy:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Przykład notatnika: odczytywanie i zapisywanie plików Avro

W poniższym notesie pokazano, jak odczytywać i zapisywać pliki Avro.

Odczytywanie i zapisywanie w notatniku plików Avro

Zdobyć notes