Plik Avro
Apache Avro to system serializacji danych. Firma Avro zapewnia:
- Rozbudowane struktury danych.
- Kompaktowy, szybki, binarny format danych.
- Plik kontenera do przechowywania trwałych danych.
- Zdalne wywołanie procedury (RPC).
- Prosta integracja z językami dynamicznymi. Generowanie kodu nie jest wymagane do odczytywania ani zapisywania plików danych ani używania ani implementowania protokołów RPC. Generowanie kodu jako opcjonalna optymalizacja, warto zaimplementować tylko w przypadku języków statycznie typiowanych.
- Konwersja schematu: automatyczna konwersja między rekordami Apache Spark SQL i Avro.
- Partycjonowanie: łatwe odczytywanie i zapisywanie partycjonowanych danych bez dodatkowej konfiguracji.
- Kompresja: kompresja używana podczas zapisywania avro na dysku. Obsługiwane typy to
uncompressed
,snappy
ideflate
. Można również określić poziom deflacji. - Nazwy rekordów: nazwa rekordu i przestrzeń nazw, przekazując mapę parametrów za pomocą
recordName
parametrów irecordNamespace
.
Zobacz również Odczytywanie i zapisywanie danych avro przesyłanych strumieniowo.
Konfigurowanie
Zachowanie źródła danych Avro można zmienić przy użyciu różnych parametrów konfiguracji.
Aby zignorować pliki bez .avro
rozszerzenia podczas odczytywania, można ustawić parametr avro.mapred.ignore.inputs.without.extension
w konfiguracji usługi Hadoop. Wartość domyślna to false
.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Aby skonfigurować kompresję podczas pisania, ustaw następujące właściwości platformy Spark:
- Koder koder-dekoder kompresji:
spark.sql.avro.compression.codec
. Obsługiwane koderki tosnappy
ideflate
. Domyślny koder koderowy tosnappy
. - Jeśli koder koder kompresji to
deflate
, możesz ustawić poziom kompresji na:spark.sql.avro.deflate.level
. Domyślnym poziomem jest-1
.
Te właściwości można ustawić w konfiguracji platformy Spark klastra lub w czasie wykonywania przy użyciu polecenia spark.conf.set()
. Na przykład:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
W przypadku środowiska Databricks Runtime 9.1 LTS lub nowszego można zmienić domyślne zachowanie wnioskowania schematu w usłudze Avro, podając mergeSchema
opcję podczas odczytywania plików. Ustawienie mergeSchema
, aby true
wywnioskować schemat z zestawu plików Avro w katalogu docelowym i scali je, a nie wywnioskować schemat odczytu z jednego pliku.
Obsługiwane typy dla usługi Avro —> konwersja spark SQL
Ta biblioteka obsługuje odczytywanie wszystkich typów Avro. Używa następującego mapowania z typów Avro na typy Spark SQL:
Typ Avro | Typ spark SQL |
---|---|
boolean | Typ logiczny |
int | Typ liczby całkowitej |
długi | LongType |
liczba zmiennoprzecinkowa | FloatType |
double | DoubleType |
B | BinaryType |
string | StringType |
rekord | Typ struktury |
wyliczenie | StringType |
tablica | ArrayType |
map | Typ mapy |
stały | BinaryType |
unia | Zobacz Typy unii. |
Typy unii
Źródło danych Avro obsługuje typy odczytu union
. Firma Avro uważa, że następujące trzy typy mają być union
typami:
union(int, long)
mapuje naLongType
.union(float, double)
mapuje naDoubleType
.union(something, null)
, gdziesomething
jest dowolnym obsługiwanym typem Avro. To mapuje na ten sam typ spark SQL cosomething
typ , z ustawioną wartościąnullable
true
.
Wszystkie inne union
typy są typami złożonymi. Mapują one na StructType
lokalizację, w której nazwy pól to member0
, member1
i tak dalej, zgodnie z elementami członkowskimi .union
Jest to zgodne z zachowaniem podczas konwertowania między Avro i Parquet.
Typy logiczne
Źródło danych Avro obsługuje odczytywanie następujących typów logicznych Avro:
Typ logiczny Avro | Typ Avro | Typ spark SQL |
---|---|---|
data | int | Typ daty |
timestamp-millis | długi | TimestampType |
timestamp-micros | długi | TimestampType |
decimal | stały | Typ dziesiętny |
decimal | B | Typ dziesiętny |
Uwaga
Źródło danych Avro ignoruje dokumenty, aliasy i inne właściwości obecne w pliku Avro.
Obsługiwane typy dla usługi Spark SQL —> konwersja avro
Ta biblioteka obsługuje pisanie wszystkich typów spark SQL w usłudze Avro. W przypadku większości typów mapowanie z typów platformy Spark na typy Avro jest proste (na przykład IntegerType
jest konwertowane na int
); poniżej znajduje się lista kilku specjalnych przypadków:
Typ spark SQL | Typ Avro | Typ logiczny Avro |
---|---|---|
Typ bajtu | int | |
ShortType | int | |
BinaryType | B | |
Typ dziesiętny | stały | decimal |
TimestampType | długi | timestamp-micros |
Typ daty | int | data |
Można również określić cały schemat danych wyjściowych Avro z opcją avroSchema
, aby typy Spark SQL można przekonwertować na inne typy Avro.
Następujące konwersje nie są stosowane domyślnie i wymagają określonego przez użytkownika schematu Avro:
Typ spark SQL | Typ Avro | Typ logiczny Avro |
---|---|---|
Typ bajtu | stały | |
StringType | wyliczenie | |
Typ dziesiętny | B | decimal |
TimestampType | długi | timestamp-millis |
Przykłady
W tych przykładach użyto pliku episodes.avro .
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
W tym przykładzie pokazano niestandardowy schemat Avro:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
W tym przykładzie przedstawiono opcje kompresji Avro:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
W tym przykładzie pokazano partycjonowane rekordy Avro:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
W tym przykładzie przedstawiono nazwę rekordu i przestrzeń nazw:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
Aby wysłać zapytanie do danych Avro w programie SQL, zarejestruj plik danych jako tabelę lub widok tymczasowy:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Przykład notesu: odczytywanie i zapisywanie plików Avro
W poniższym notesie pokazano, jak odczytywać i zapisywać pliki Avro.