Partager via


Fichier Avro

Apache Avro est un système de sérialisation de données. Avro fournit les éléments suivants :

  • Structures de données riches.
  • Format de données binaire, rapide et compact.
  • Un fichier conteneur pour stocker les données persistantes.
  • Appel de procédure distante (RPC)
  • Intégration simple avec les langages dynamiques. La génération de code n’est pas nécessaire pour lire ou écrire des fichiers de données, ni pour utiliser ou implémenter des protocoles RPC. Génération de code comme une optimisation facultative, qui mérite d’être implémentée uniquement pour les langages typés statiquement.

La source de données Avro prend en charge :

  • conversion de schéma : conversion automatique entre Apache Spark SQL et les enregistrements Avro.
  • Partitionnement : lecture et écriture aisées de données partitionnées sans configuration supplémentaire.
  • Compression : compression à utiliser lors de l’écriture de Avro sur le disque. Les types pris en charge sont uncompressed, snappy, et deflate. Vous pouvez également spécifier le niveau de déflation.
  • Noms d’enregistrement : nom et espace de noms d’enregistrement en passant une carte de paramètres avec recordName et recordNamespace .

Voir aussi Lire et écrire des données Avro en streaming

Configuration

Vous pouvez modifier le comportement d’une source de données Avro à l’aide de différents paramètres de configuration.

Pour ignorer les fichiers sans l’extension .avro lors de la lecture, vous pouvez définir le paramètre avro.mapred.ignore.inputs.without.extension dans la configuration Hadoop. Par défaut, il s’agit de false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Pour configurer la compression lors de l’écriture, définissez les propriétés Spark suivantes :

  • Codec de compression : spark.sql.avro.compression.codec . Les codecs pris en charge sont snappy et deflate . Le codec par défaut est snappy .
  • Si le codec de compression est deflate , vous pouvez définir le niveau de compression avec : spark.sql.avro.deflate.level . Le niveau par défaut est -1.

Vous pouvez définir ces propriétés dans la configuration du cluster Spark ou au moment de l’exécution à l’aide spark.conf.set() de. Par exemple :

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Pour Databricks Runtime 9,1 LTS et versions ultérieures, vous pouvez modifier le comportement d’inférence de schéma par défaut dans Avro en fournissant l’option lors de la mergeSchema lecture de fichiers. La définition mergeSchema de pour true déduire un schéma à partir d’un ensemble de fichiers Avro dans le répertoire cible et les fusionner au lieu de déduire le schéma de lecture à partir d’un fichier unique.

types pris en charge pour la conversion de SQL Avro - > Spark

Cette bibliothèque prend en charge la lecture de tous les types Avro. il utilise le mappage suivant entre les types Avro et les types de SQL Spark :

Type Avro Type Spark SQL
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
octets BinaryType
string StringType
enregistrement StructType
enum StringType
tableau ArrayType
map MapType
Corrigé BinaryType
union Voir Types d’union.

Types union

La source de données Avro prend en charge les types de lecture union . Avro considère les trois types union suivants comme des types :

  • union(int, long) est mappé à LongType.
  • union(float, double) est mappé à DoubleType.
  • union(something, null), où something est tout type Avro pris en charge. correspond au même type de SQL Spark que celui de something , avec nullable la valeur true .

Tous les autres union types sont des types complexes. Ils sont mappés à StructType où les noms de champs sont member0 , member1 , et ainsi de suite, conformément aux membres de union . Cela est cohérent avec le comportement lors de la conversion entre Avro et parquet.

Types logiques

La source de données Avro prend en charge la lecture des types logiques Avrosuivants :

Type logique Avro Type Avro Type Spark SQL
Date int DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
Décimal Corrigé DecimalType
Décimal octets DecimalType

Notes

La source de données Avro ignore les documents, les alias et les autres propriétés présents dans le fichier Avro.

Types pris en charge pour la conversion Spark SQL ->Conversion Avro

Cette bibliothèque prend en charge l’écriture de tous les types de SQL Spark dans Avro. Pour la plupart des types, le mappage entre les types Spark et les types Avro est simple (par exemple IntegerType , est converti en int ); Voici une liste des rares cas spéciaux :

Type Spark SQL Type Avro Type logique Avro
ByteType int
ShortType int
BinaryType octets
DecimalType Corrigé Décimal
TimestampType long timestamp-micros
DateType int Date

vous pouvez également spécifier le schéma Avro de sortie entier avec l’option avroSchema , afin que les types de SQL Spark puissent être convertis en d’autres types Avro. Les conversions suivantes ne sont pas appliquées par défaut et nécessitent le schéma Avro spécifié par l’utilisateur :

Type Spark SQL Type Avro Type logique Avro
ByteType Corrigé
StringType enum
DecimalType octets Décimal
TimestampType long timestamp-millis

Exemples

Ces exemples utilisent le fichier épisodes.avro .

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

Cet exemple illustre un schéma Avro personnalisé :

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

Cet exemple illustre les options de compression Avro :

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

Cet exemple illustre les enregistrements Avro partitionnés :

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

Cet exemple illustre le nom et l’espace de noms de l’enregistrement :

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

pour interroger des données Avro dans SQL, enregistrez le fichier de données sous la forme d’une table ou d’une vue temporaire :

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Exemple de notebook : Lire et écrire dans des fichiers Avro

Le bloc-notes suivant montre comment lire et écrire des fichiers Avro.

Lire et écrire le bloc-notes des fichiers Avro

Obtenir le notebook