Udostępnij za pośrednictwem


Plik Avro

Apache Avro to system serializacji danych. Firma Avro zapewnia:

  • Rozbudowane struktury danych.
  • Kompaktowy, szybki, binarny format danych.
  • Plik kontenera do przechowywania trwałych danych.
  • Zdalne wywołanie procedury (RPC).
  • Prosta integracja z językami dynamicznymi. Generowanie kodu nie jest wymagane do odczytywania ani zapisywania plików danych ani używania ani implementowania protokołów RPC. Generowanie kodu jako opcjonalna optymalizacja, warto zaimplementować tylko w przypadku języków statycznie typiowanych.

Źródło danych Avro obsługuje:

  • Konwersja schematu: automatyczna konwersja między rekordami Apache Spark SQL i Avro.
  • Partycjonowanie: łatwe odczytywanie i zapisywanie partycjonowanych danych bez dodatkowej konfiguracji.
  • Kompresja: kompresja używana podczas zapisywania avro na dysku. Obsługiwane typy to uncompressed, snappyi deflate. Można również określić poziom deflacji.
  • Nazwy rekordów: nazwa rekordu i przestrzeń nazw, przekazując mapę parametrów za pomocą recordName parametrów i recordNamespace.

Zobacz również Odczytywanie i zapisywanie danych avro przesyłanych strumieniowo.

Konfigurowanie

Zachowanie źródła danych Avro można zmienić przy użyciu różnych parametrów konfiguracji.

Aby zignorować pliki bez .avro rozszerzenia podczas odczytywania, można ustawić parametr avro.mapred.ignore.inputs.without.extension w konfiguracji usługi Hadoop. Wartość domyślna to false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Aby skonfigurować kompresję podczas pisania, ustaw następujące właściwości platformy Spark:

  • Koder koder-dekoder kompresji: spark.sql.avro.compression.codec. Obsługiwane koderki to snappy i deflate. Domyślny koder koderowy to snappy.
  • Jeśli koder koder kompresji to deflate, możesz ustawić poziom kompresji na: spark.sql.avro.deflate.level. Domyślnym poziomem jest -1.

Te właściwości można ustawić w konfiguracji platformy Spark klastra lub w czasie wykonywania przy użyciu polecenia spark.conf.set(). Na przykład:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

W przypadku środowiska Databricks Runtime 9.1 LTS lub nowszego można zmienić domyślne zachowanie wnioskowania schematu w usłudze Avro, podając mergeSchema opcję podczas odczytywania plików. Ustawienie mergeSchema , aby true wywnioskować schemat z zestawu plików Avro w katalogu docelowym i scali je, a nie wywnioskować schemat odczytu z jednego pliku.

Obsługiwane typy dla usługi Avro —> konwersja spark SQL

Ta biblioteka obsługuje odczytywanie wszystkich typów Avro. Używa następującego mapowania z typów Avro na typy Spark SQL:

Typ Avro Typ spark SQL
boolean Typ logiczny
int Typ liczby całkowitej
długi LongType
liczba zmiennoprzecinkowa FloatType
double DoubleType
B BinaryType
string StringType
rekord Typ struktury
wyliczenie StringType
tablica ArrayType
map Typ mapy
stały BinaryType
unia Zobacz Typy unii.

Typy unii

Źródło danych Avro obsługuje typy odczytu union . Firma Avro uważa, że następujące trzy typy mają być union typami:

  • union(int, long) mapuje na LongType.
  • union(float, double) mapuje na DoubleType.
  • union(something, null), gdzie something jest dowolnym obsługiwanym typem Avro. To mapuje na ten sam typ spark SQL co somethingtyp , z ustawioną wartością nullable true.

Wszystkie inne union typy są typami złożonymi. Mapują one na StructType lokalizację, w której nazwy pól to member0, member1i tak dalej, zgodnie z elementami członkowskimi .union Jest to zgodne z zachowaniem podczas konwertowania między Avro i Parquet.

Typy logiczne

Źródło danych Avro obsługuje odczytywanie następujących typów logicznych Avro:

Typ logiczny Avro Typ Avro Typ spark SQL
data int Typ daty
timestamp-millis długi TimestampType
timestamp-micros długi TimestampType
decimal stały Typ dziesiętny
decimal B Typ dziesiętny

Uwaga

Źródło danych Avro ignoruje dokumenty, aliasy i inne właściwości obecne w pliku Avro.

Obsługiwane typy dla usługi Spark SQL —> konwersja avro

Ta biblioteka obsługuje pisanie wszystkich typów spark SQL w usłudze Avro. W przypadku większości typów mapowanie z typów platformy Spark na typy Avro jest proste (na przykład IntegerType jest konwertowane na int); poniżej znajduje się lista kilku specjalnych przypadków:

Typ spark SQL Typ Avro Typ logiczny Avro
Typ bajtu int
ShortType int
BinaryType B
Typ dziesiętny stały decimal
TimestampType długi timestamp-micros
Typ daty int data

Można również określić cały schemat danych wyjściowych Avro z opcją avroSchema, aby typy Spark SQL można przekonwertować na inne typy Avro. Następujące konwersje nie są stosowane domyślnie i wymagają określonego przez użytkownika schematu Avro:

Typ spark SQL Typ Avro Typ logiczny Avro
Typ bajtu stały
StringType wyliczenie
Typ dziesiętny B decimal
TimestampType długi timestamp-millis

Przykłady

W tych przykładach użyto pliku episodes.avro .

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

W tym przykładzie pokazano niestandardowy schemat Avro:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

W tym przykładzie przedstawiono opcje kompresji Avro:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

W tym przykładzie pokazano partycjonowane rekordy Avro:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

W tym przykładzie przedstawiono nazwę rekordu i przestrzeń nazw:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Aby wysłać zapytanie do danych Avro w programie SQL, zarejestruj plik danych jako tabelę lub widok tymczasowy:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Przykład notesu: odczytywanie i zapisywanie plików Avro

W poniższym notesie pokazano, jak odczytywać i zapisywać pliki Avro.

Odczytywanie i zapisywanie notesu plików Avro

Pobierz notes