Delen via


Avro-bestand

Apache Avro is een systeem voor gegevensserialisatie. Avro biedt:

  • Uitgebreide gegevensstructuren.
  • Een compacte, snelle, binaire gegevensindeling.
  • Een containerbestand om permanente gegevens op te slaan.
  • Externe procedure-aanroep (RPC).
  • Eenvoudige integratie met dynamische talen. Het genereren van code is niet vereist voor het lezen of schrijven van gegevensbestanden en het gebruik of het implementeren van RPC-protocollen. Codegeneratie als optionele optimalisatie, alleen de moeite waard om te implementeren voor statisch getypte talen.

De Avro-gegevensbron ondersteunt:

  • Schemaconversie: Automatische conversie tussen Apache Spark SQL- en Avro-records.
  • Partitioneren: eenvoudig gepartitioneerde gegevens lezen en schrijven zonder extra configuratie.
  • Compressie: Compressie die moet worden gebruikt bij het schrijven van Avro naar schijf. De ondersteunde typen zijn uncompressed, snappyen deflate. U kunt ook het aflopende niveau opgeven.
  • Recordnamen: Recordnaam en naamruimte door een kaart met parameters door te geven met recordName en recordNamespace.

Zie ook Streaming Avro-gegevens lezen en schrijven.

Configuratie

U kunt het gedrag van een Avro-gegevensbron wijzigen met behulp van verschillende configuratieparameters.

Als u bestanden zonder extensie .avro wilt negeren tijdens het lezen, kunt u de parameter avro.mapred.ignore.inputs.without.extension instellen in de Hadoop-configuratie. De standaardwaarde is false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Als u compressie wilt configureren bij het schrijven, stelt u de volgende Spark-eigenschappen in:

  • Compressiecodec: spark.sql.avro.compression.codec. Ondersteunde codecs zijn snappy en deflate. De standaardcodec is snappy.
  • Als de compressiecodec is deflate, kunt u het compressieniveau instellen met: spark.sql.avro.deflate.level. Het standaardniveau is -1.

U kunt deze eigenschappen instellen in de Spark-clusterconfiguratie of tijdens runtime.spark.conf.set() Voorbeeld:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Voor Databricks Runtime 9.1 LTS en hoger kunt u het standaardgedrag voor schemadeductie in Avro wijzigen door de optie op te geven bij het mergeSchema lezen van bestanden. Als u wilt true instellenmergeSchema, wordt een schema afgeleid van een set Avro-bestanden in de doelmap en worden ze samengevoegd in plaats van het leesschema uit één bestand af te leiden.

Ondersteunde typen voor Avro -> Spark SQL-conversie

Deze bibliotheek ondersteunt het lezen van alle Avro-typen. Er wordt gebruikgemaakt van de volgende toewijzing van Avro-typen naar Spark SQL-typen:

Avro-type Spark SQL-type
boolean BooleanType
int IntegerType
long LongType
zwevend FloatType
dubbel DoubleType
bytes BinaryType
tekenreeks StringType
record StructType
enum StringType
matrix ArrayType
map MapType
vast BinaryType
union Zie Union-typen.

Samenvoegtypen

De Avro-gegevensbron ondersteunt leestypen union . Avro beschouwt de volgende drie typen als union typen:

  • union(int, long) wordt toegewezen aan LongType.
  • union(float, double) wordt toegewezen aan DoubleType.
  • union(something, null), waarbij something elk ondersteund Avro-type is. Dit wordt toegewezen aan hetzelfde Spark SQL-type als dat van something, waarbij nullable dit is ingesteld op true.

Alle andere union typen zijn complexe typen. Ze worden toegewezen aan StructType waar veldnamen zijn member0, member1enzovoort, in overeenstemming met leden van de union. Dit is consistent met het gedrag bij het converteren tussen Avro en Parquet.

Logische typen

De Avro-gegevensbron ondersteunt het lezen van de volgende logische Avro-typen:

Logisch avro-type Avro-type Spark SQL-type
datum int DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
decimal vast DecimalType
decimal bytes DecimalType

Notitie

De Avro-gegevensbron negeert documenten, aliassen en andere eigenschappen die aanwezig zijn in het Avro-bestand.

Ondersteunde typen voor Spark SQL -> Avro-conversie

Deze bibliotheek ondersteunt het schrijven van alle Spark SQL-typen in Avro. Voor de meeste typen is de toewijzing van Spark-typen naar Avro-typen eenvoudig (bijvoorbeeld IntegerType wordt geconverteerd naar int); hieronder volgt een lijst met de weinige speciale gevallen:

Spark SQL-type Avro-type Logisch avro-type
ByteType int
ShortType int
BinaryType bytes
DecimalType vast decimal
TimestampType long timestamp-micros
DateType int datum

U kunt ook het hele Avro-uitvoerschema opgeven met de optie avroSchema, zodat Spark SQL-typen kunnen worden geconverteerd naar andere Avro-typen. De volgende conversies worden niet standaard toegepast en vereisen door de gebruiker opgegeven Avro-schema:

Spark SQL-type Avro-type Logisch avro-type
ByteType vast
StringType enum
DecimalType bytes decimal
TimestampType long timestamp-millis

Voorbeelden

In deze voorbeelden wordt het bestand episode.avro gebruikt.

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

In dit voorbeeld ziet u een aangepast Avro-schema:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

In dit voorbeeld ziet u opties voor Avro-compressie:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

In dit voorbeeld ziet u gepartitioneerde Avro-records:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

In dit voorbeeld ziet u de recordnaam en naamruimte:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Als u een query wilt uitvoeren op Avro-gegevens in SQL, registreert u het gegevensbestand als een tabel of tijdelijke weergave:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Voorbeeld van notitieblok: Avro-bestanden lezen en schrijven

In het volgende notebook ziet u hoe u Avro-bestanden kunt lezen en schrijven.

Avro-bestandennotitieblok lezen en schrijven

Notebook downloaden