Dela via


Avro-fil

Apache Avro är ett data serialiseringssystem. Avro tillhandahåller:

  • Omfattande datastrukturer.
  • Ett kompakt, snabbt, binärt dataformat.
  • En containerfil för att lagra beständiga data.
  • Fjärrproceduranrop (RPC).
  • Enkel integrering med dynamiska språk. Kodgenerering krävs inte för att läsa eller skriva datafiler eller för att använda eller implementera RPC-protokoll. Kodgenerering som en valfri optimering, bara värt att implementera för statiskt skrivna språk.

Avro-datakällan stöder:

  • Schema konvertering: Automatisk konvertering mellan Apache Spark SQL och Avro-rekord.
  • Partitionering: Läs och skriv enkelt partitionerade data utan extra konfiguration.
  • Komprimering: Komprimering som ska användas när Avro skrivs ut till disk. De typer som stöds är uncompressed, snappyoch deflate. Du kan också ange deflate-nivån.
  • Postnamn: Postnamn och namnområde genom att överföra en karta över parameters med recordName och recordNamespace.

Se även Läsa och skriva strömmande Avro-data.

Konfiguration

Du kan ändra beteendet för en Avro-datakälla med hjälp av olika konfigurationer parameters.

Om du vill ignorera filer utan .avro-tillägget vid läsning kan du set parametern avro.mapred.ignore.inputs.without.extension i Hadoop-konfigurationen. Standardvärdet är false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Om du vill konfigurera komprimering när du skriver set du följande Spark-egenskaper:

  • Komprimeringskod: spark.sql.avro.compression.codec. Codecs som stöds är snappy och deflate. Standardkodcen är snappy.
  • Om komprimeringskodcen är deflatekan du set komprimeringsnivån med: spark.sql.avro.deflate.level. Standardnivån är -1.

Du kan set dessa inställningar i konfigurationen för klustret Spark eller vid körning med hjälp av spark.conf.set(). Till exempel:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

För Databricks Runtime 9.1 LTS och senare kan du ändra standardbeteendet för schema slutsatsdragning i Avro genom att ange mergeSchema alternativet när du läser filer. Om du anger mergeSchema till true härleds en schema från en set av Avro-filer i målkatalogen och sammanfogar dem i stället för att härleda schema från en enda fil.

Typer som stöds för Avro –> Spark SQL-konvertering

Det här biblioteket stöder läsning av alla Avro-typer. Den använder följande mappning från Avro-typer till Spark SQL-typer:

Avro-typ Spark SQL-typ
boolean BooleanType
heltal IntegerType
lång LongType
flyttal FloatType
dubbel DoubleType
byte BinaryType
sträng StringType
rekord StructType
uppräkning StringType
matris ArrayType
map MapType
fast BinaryType
union Se Union-typer.

Union-typer

Avro-datakällan stöder lästyper union . Avro anser att följande tre typer är union typer:

  • union(int, long) mappar till LongType.
  • union(float, double) mappar till DoubleType.
  • union(something, null), whereochsomething är alla typer som stöds av Avro. Detta mappar till samma Spark SQL-typ som för something, med nullableset till true.

Alla andra union typer är komplexa typer. De mappas till StructTypewhere fältnamn är member0, member1och så vidare, i enlighet med medlemmarna i union. Detta överensstämmer med beteendet vid konvertering mellan Avro och Parquet.

Logiska typer

Avro-datakällan stöder läsning av följande logiska Avro-typer:

Logisk avro-typ Avro-typ Spark SQL-typ
datum heltal DateType
timestamp-millis lång Tidsstämpeltyp
timestamp-micros lång Tidsstämpeltyp
decimal fast Decimaltyp
decimal byte Decimaltyp

Kommentar

Avro-datakällan ignorerar dokument, alias och andra egenskaper som finns i Avro-filen.

Typer som stöds för Spark SQL –> Avro-konvertering

Det här biblioteket stöder skrivning av alla Spark SQL-typer till Avro. För de flesta typer är mappningen från Spark-typer till Avro-typer enkel (till exempel IntegerType konverteras till int); Följande är en list av de få specialfallen:

Spark SQL-typ Avro-typ Logisk avro-typ
ByteType heltal
ShortType heltal
BinaryType byte
Decimaltyp fast decimal
Tidsstämpeltyp lång timestamp-micros
DateType heltal datum

Du kan också ange hela Avro-utdata schema med alternativet avroSchema, så att Spark SQL-typer kan konverteras till andra Avro-typer. Följande konverteringar tillämpas inte som standard och kräver att användaren har angett Avro schema:

Spark SQL-typ Avro-typ Logisk avro-typ
ByteType fast
StringType uppräkning
Decimaltyp byte decimal
Tidsstämpeltyp lång timestamp-millis

Exempel

I de här exemplen används filen episodes.avro .

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

Det här exemplet visar en anpassad Avro-schema:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

Det här exemplet visar Alternativ för Avro-komprimering:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

Det här exemplet visar partitionerade Avro-poster:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

Det här exemplet visar postnamnet och namnområdet:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Om du vill köra frågor mot Avro-data i SQL registrerar du datafilen som en table eller tillfällig vy:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Notebook-exempel: Läsa och skriva Avro-filer

Följande notebook-fil visar hur du läser och skriver Avro-filer.

Läsa och skriva Anteckningsbok för Avro-filer

Get anteckningsbok