Condividi tramite


File Avro

Apache Avro è un sistema di serializzazione dei dati. Avro fornisce:

  • Strutture dei dati arricchiti.
  • Formato di dati binario compatto e veloce.
  • Un file contenitore per archiviare i dati persistenti.
  • Remote procedure call (RPC).
  • Integrazione semplice con linguaggi dinamici. La generazione del codice non è necessaria per leggere o scrivere file di dati né per usare o implementare protocolli RPC. Generazione di codice come ottimizzazione facoltativa: conviene implementarla solo per i linguaggi tipizzati in modo statico.

L'origine dati Avro supporta:

  • Conversione dello schema: conversione automatica tra i record APACHE Spark SQL e Avro.
  • Partizionamento: leggere e scrivere facilmente dati partizionati senza alcuna configurazione aggiuntiva.
  • Compressione: compressione da usare durante la scrittura di Avro su disco. I tipi supportati sono uncompressed, snappy e deflate. È anche possibile specificare il livello di deflate.
  • Nomi dei record: nome e spazio dei nomi dei record tramite inserimento di una mappa di parametri con recordName e recordNamespace.

Vedere anche Leggere e scrivere dati Avro in streaming.

Impostazione

È possibile modificare il comportamento di un'origine dati Avro usando vari parametri di configurazione.

Per ignorare i file senza l'estensione .avro durante la lettura, è possibile impostare il parametro avro.mapred.ignore.inputs.without.extension nella configurazione di Hadoop. Il valore predefinito è false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Per configurare la compressione durante la scrittura, impostare le seguenti proprietà di Spark:

  • Codec di compressione: spark.sql.avro.compression.codec. I codec supportati sono snappy e deflate. Il codec predefinito è snappy.
  • Se il codec di compressione è deflate, è possibile impostare il livello di compressione con : spark.sql.avro.deflate.level. Il livello predefinito è -1.

È possibile impostare queste proprietà nella configurazione Spark del cluster o in fase di esecuzione usando spark.conf.set(). Ad esempio:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Per Databricks Runtime 9.1 LTS e versioni successive, è possibile modificare il comportamento di inferenza dello schema predefinito in Avro scegliendo l'opzione mergeSchema durante la lettura dei file. Se si imposta mergeSchema su true l'inferenza di uno schema avverrà unendo i file di un set di file Avro nella directory di destinazione invece che a partire dallo schema di lettura da un singolo file.

Tipi supportati per Avro -> Conversione spark SQL

Questa libreria supporta la lettura di tutti i tipi Avro. Per abbinare i tipi Avro ai tipi Spark SQL, usa il mapping seguente:

Tipo Avro Tipo SQL Spark
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
bytes BinaryType
string StringType
registra StructType
enum StringType
array ArrayType
mappa MapType
fixed BinaryType
union Vedere Tipi unione.

Tipi unione

L'origine dati Avro supporta i tipi di lettura union. Avro presuppone che i tre tipi seguenti siano tipi union:

  • union(int, long) esegue il mapping a LongType.
  • union(float, double) esegue il mapping a DoubleType.
  • union(something, null), dove something è qualsiasi tipo Avro supportato. Viene eseguito il mapping allo stesso tipo di Spark SQL di something, con nullable impostato su true.

Tutti gli altri tipi union sono tipi complessi. Eseguono il mapping a StructType, in cui i nomi dei campi sono member0, member1 e così via, sulla base dei membri di union. Questo comportamento è coerente con il comportamento durante la conversione tra Avro e Parquet.

Tipi logici

L'origine dati Avro supporta la lettura dei seguenti tipi logici Avro:

Tipo logico Avro Tipo Avro Tipo SQL Spark
data int DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
decimale fixed DecimalType
decimale bytes DecimalType

Nota

L'origine dati Avro ignora documenti, alias e altre proprietà presenti nel file Avro.

Tipi supportati per Spark SQL -> Conversione Avro

Questa libreria supporta la scrittura di tutti i tipi Spark SQL in Avro. Per la maggior parte dei tipi, il mapping dai tipi Spark ai tipi Avro è diretta (ad esempio IntegerType viene convertito in int); di seguito è riportato un elenco dei pochi casi speciali:

Tipo SQL Spark Tipo Avro Tipo logico Avro
ByteType int
ShortType int
BinaryType bytes
DecimalType fixed decimale
TimestampType long timestamp-micros
DateType int data

È anche possibile specificare l'intero schema Avro di output con l'opzione avroSchema, in modo che i tipi Spark SQL possano essere convertiti in altri tipi Avro. Le conversioni seguenti non vengono applicate per impostazione predefinita e richiedono lo schema Avro specificato dall'utente:

Tipo SQL Spark Tipo Avro Tipo logico Avro
ByteType fixed
StringType enum
DecimalType bytes decimale
TimestampType long timestamp-millis

Esempi

Questi esempi usano il file episodes.avro.

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

Questo esempio illustra uno schema Avro personalizzato:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

Questo esempio illustra le opzioni di compressione Avro:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

Questo esempio illustra i record Avro partizionati:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

Questo esempio illustra il nome del record e lo spazio dei nomi:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Per eseguire query sui dati avro in SQL, registrare il file di dati come tabella o vista temporanea:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Esempio di notebook: leggere e scrivere file Avro

Il notebook seguente illustra come leggere e scrivere file Avro.

Notebook: Leggere e scrivere file Avro

Ottenere il notebook