Avro-bestand
Apache Avro is een systeem voor gegevensserialisatie. Avro biedt:
- Uitgebreide gegevensstructuren.
- Een compacte, snelle, binaire gegevensindeling.
- Een containerbestand om permanente gegevens op te slaan.
- Externe procedure-aanroep (RPC).
- Eenvoudige integratie met dynamische talen. Het genereren van code is niet vereist voor het lezen of schrijven van gegevensbestanden en het gebruik of het implementeren van RPC-protocollen. Codegeneratie als optionele optimalisatie, alleen de moeite waard om te implementeren voor statisch getypte talen.
De Avro-gegevensbron ondersteunt:
- Schemaconversie: Automatische conversie tussen Apache Spark SQL- en Avro-records.
- Partitioneren: eenvoudig gepartitioneerde gegevens lezen en schrijven zonder extra configuratie.
- Compressie: Compressie die moet worden gebruikt bij het schrijven van Avro naar schijf. De ondersteunde typen zijn
uncompressed
,snappy
endeflate
. U kunt ook het aflopende niveau opgeven. - Recordnamen: Recordnaam en naamruimte door een kaart met parameters door te geven met
recordName
enrecordNamespace
.
Zie ook Streaming Avro-gegevens lezen en schrijven.
Configuratie
U kunt het gedrag van een Avro-gegevensbron wijzigen met behulp van verschillende configuratieparameters.
Als u bestanden zonder extensie .avro
wilt negeren tijdens het lezen, kunt u de parameter avro.mapred.ignore.inputs.without.extension
instellen in de Hadoop-configuratie. De standaardwaarde is false
.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Als u compressie wilt configureren bij het schrijven, stelt u de volgende Spark-eigenschappen in:
- Compressiecodec:
spark.sql.avro.compression.codec
. Ondersteunde codecs zijnsnappy
endeflate
. De standaardcodec issnappy
. - Als de compressiecodec is
deflate
, kunt u het compressieniveau instellen met:spark.sql.avro.deflate.level
. Het standaardniveau is-1
.
U kunt deze eigenschappen instellen in de Spark-clusterconfiguratie of tijdens runtime.spark.conf.set()
Voorbeeld:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Voor Databricks Runtime 9.1 LTS en hoger kunt u het standaardgedrag voor schemadeductie in Avro wijzigen door de optie op te geven bij het mergeSchema
lezen van bestanden. Als u wilt true
instellenmergeSchema
, wordt een schema afgeleid van een set Avro-bestanden in de doelmap en worden ze samengevoegd in plaats van het leesschema uit één bestand af te leiden.
Ondersteunde typen voor Avro -> Spark SQL-conversie
Deze bibliotheek ondersteunt het lezen van alle Avro-typen. Er wordt gebruikgemaakt van de volgende toewijzing van Avro-typen naar Spark SQL-typen:
Avro-type | Spark SQL-type |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
zwevend | FloatType |
dubbel | DoubleType |
bytes | BinaryType |
tekenreeks | StringType |
record | StructType |
enum | StringType |
matrix | ArrayType |
map | MapType |
vast | BinaryType |
union | Zie Union-typen. |
Samenvoegtypen
De Avro-gegevensbron ondersteunt leestypen union
. Avro beschouwt de volgende drie typen als union
typen:
union(int, long)
wordt toegewezen aanLongType
.union(float, double)
wordt toegewezen aanDoubleType
.union(something, null)
, waarbijsomething
elk ondersteund Avro-type is. Dit wordt toegewezen aan hetzelfde Spark SQL-type als dat vansomething
, waarbijnullable
dit is ingesteld optrue
.
Alle andere union
typen zijn complexe typen. Ze worden toegewezen aan StructType
waar veldnamen zijn member0
, member1
enzovoort, in overeenstemming met leden van de union
. Dit is consistent met het gedrag bij het converteren tussen Avro en Parquet.
Logische typen
De Avro-gegevensbron ondersteunt het lezen van de volgende logische Avro-typen:
Logisch avro-type | Avro-type | Spark SQL-type |
---|---|---|
datum | int | DateType |
timestamp-millis | long | TimestampType |
timestamp-micros | long | TimestampType |
decimal | vast | DecimalType |
decimal | bytes | DecimalType |
Notitie
De Avro-gegevensbron negeert documenten, aliassen en andere eigenschappen die aanwezig zijn in het Avro-bestand.
Ondersteunde typen voor Spark SQL -> Avro-conversie
Deze bibliotheek ondersteunt het schrijven van alle Spark SQL-typen in Avro. Voor de meeste typen is de toewijzing van Spark-typen naar Avro-typen eenvoudig (bijvoorbeeld IntegerType
wordt geconverteerd naar int
); hieronder volgt een lijst met de weinige speciale gevallen:
Spark SQL-type | Avro-type | Logisch avro-type |
---|---|---|
ByteType | int | |
ShortType | int | |
BinaryType | bytes | |
DecimalType | vast | decimal |
TimestampType | long | timestamp-micros |
DateType | int | datum |
U kunt ook het hele Avro-uitvoerschema opgeven met de optie avroSchema
, zodat Spark SQL-typen kunnen worden geconverteerd naar andere Avro-typen.
De volgende conversies worden niet standaard toegepast en vereisen door de gebruiker opgegeven Avro-schema:
Spark SQL-type | Avro-type | Logisch avro-type |
---|---|---|
ByteType | vast | |
StringType | enum | |
DecimalType | bytes | decimal |
TimestampType | long | timestamp-millis |
Voorbeelden
In deze voorbeelden wordt het bestand episode.avro gebruikt.
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
In dit voorbeeld ziet u een aangepast Avro-schema:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
In dit voorbeeld ziet u opties voor Avro-compressie:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
In dit voorbeeld ziet u gepartitioneerde Avro-records:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
In dit voorbeeld ziet u de recordnaam en naamruimte:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
Als u een query wilt uitvoeren op Avro-gegevens in SQL, registreert u het gegevensbestand als een tabel of tijdelijke weergave:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Voorbeeld van notitieblok: Avro-bestanden lezen en schrijven
In het volgende notebook ziet u hoe u Avro-bestanden kunt lezen en schrijven.