Fichier Avro
Apache Avro est un système de sérialisation de données. Avro fournit les éléments suivants :
- Structures de données riches.
- Format de données binaire, rapide et compact.
- Un fichier conteneur pour stocker les données persistantes.
- Appel de procédure distante (RPC)
- Intégration simple avec les langages dynamiques. La génération de code n’est pas nécessaire pour lire ou écrire des fichiers de données, ni pour utiliser ou implémenter des protocoles RPC. Génération de code comme une optimisation facultative, qui mérite d’être implémentée uniquement pour les langages typés statiquement.
La source de données Avro prend en charge :
- conversion de schéma : conversion automatique entre Apache Spark SQL et les enregistrements Avro.
- Partitionnement : lecture et écriture aisées de données partitionnées sans configuration supplémentaire.
- Compression : compression à utiliser lors de l’écriture de Avro sur le disque. Les types pris en charge sont
uncompressed
,snappy
, etdeflate
. Vous pouvez également spécifier le niveau de déflation. - Noms d’enregistrement : nom et espace de noms d’enregistrement en passant une carte de paramètres avec
recordName
etrecordNamespace
.
Voir aussi Lire et écrire des données Avro en streaming
Configuration
Vous pouvez modifier le comportement d’une source de données Avro à l’aide de différents paramètres de configuration.
Pour ignorer les fichiers sans l’extension .avro
lors de la lecture, vous pouvez définir le paramètre avro.mapred.ignore.inputs.without.extension
dans la configuration Hadoop. Par défaut, il s’agit de false
.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Pour configurer la compression lors de l’écriture, définissez les propriétés Spark suivantes :
- Codec de compression :
spark.sql.avro.compression.codec
. Les codecs pris en charge sontsnappy
etdeflate
. Le codec par défaut estsnappy
. - Si le codec de compression est
deflate
, vous pouvez définir le niveau de compression avec :spark.sql.avro.deflate.level
. Le niveau par défaut est-1
.
Vous pouvez définir ces propriétés dans la configuration du cluster Spark ou au moment de l’exécution à l’aide spark.conf.set()
de. Par exemple :
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Pour Databricks Runtime 9,1 LTS et versions ultérieures, vous pouvez modifier le comportement d’inférence de schéma par défaut dans Avro en fournissant l’option lors de la mergeSchema
lecture de fichiers. La définition mergeSchema
de pour true
déduire un schéma à partir d’un ensemble de fichiers Avro dans le répertoire cible et les fusionner au lieu de déduire le schéma de lecture à partir d’un fichier unique.
types pris en charge pour la conversion de SQL Avro - > Spark
Cette bibliothèque prend en charge la lecture de tous les types Avro. il utilise le mappage suivant entre les types Avro et les types de SQL Spark :
Type Avro | Type Spark SQL |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
float | FloatType |
double | DoubleType |
octets | BinaryType |
string | StringType |
enregistrement | StructType |
enum | StringType |
tableau | ArrayType |
map | MapType |
Corrigé | BinaryType |
union | Voir Types d’union. |
Types union
La source de données Avro prend en charge les types de lecture union
. Avro considère les trois types union
suivants comme des types :
union(int, long)
est mappé àLongType
.union(float, double)
est mappé àDoubleType
.union(something, null)
, oùsomething
est tout type Avro pris en charge. correspond au même type de SQL Spark que celui desomething
, avecnullable
la valeurtrue
.
Tous les autres union
types sont des types complexes. Ils sont mappés à StructType
où les noms de champs sont member0
, member1
, et ainsi de suite, conformément aux membres de union
. Cela est cohérent avec le comportement lors de la conversion entre Avro et parquet.
Types logiques
La source de données Avro prend en charge la lecture des types logiques Avrosuivants :
Type logique Avro | Type Avro | Type Spark SQL |
---|---|---|
Date | int | DateType |
timestamp-millis | long | TimestampType |
timestamp-micros | long | TimestampType |
Décimal | Corrigé | DecimalType |
Décimal | octets | DecimalType |
Notes
La source de données Avro ignore les documents, les alias et les autres propriétés présents dans le fichier Avro.
Types pris en charge pour la conversion Spark SQL ->Conversion Avro
Cette bibliothèque prend en charge l’écriture de tous les types de SQL Spark dans Avro. Pour la plupart des types, le mappage entre les types Spark et les types Avro est simple (par exemple IntegerType
, est converti en int
); Voici une liste des rares cas spéciaux :
Type Spark SQL | Type Avro | Type logique Avro |
---|---|---|
ByteType | int | |
ShortType | int | |
BinaryType | octets | |
DecimalType | Corrigé | Décimal |
TimestampType | long | timestamp-micros |
DateType | int | Date |
vous pouvez également spécifier le schéma Avro de sortie entier avec l’option avroSchema
, afin que les types de SQL Spark puissent être convertis en d’autres types Avro.
Les conversions suivantes ne sont pas appliquées par défaut et nécessitent le schéma Avro spécifié par l’utilisateur :
Type Spark SQL | Type Avro | Type logique Avro |
---|---|---|
ByteType | Corrigé | |
StringType | enum | |
DecimalType | octets | Décimal |
TimestampType | long | timestamp-millis |
Exemples
Ces exemples utilisent le fichier épisodes.avro .
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
Cet exemple illustre un schéma Avro personnalisé :
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
Cet exemple illustre les options de compression Avro :
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
Cet exemple illustre les enregistrements Avro partitionnés :
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
Cet exemple illustre le nom et l’espace de noms de l’enregistrement :
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
pour interroger des données Avro dans SQL, enregistrez le fichier de données sous la forme d’une table ou d’une vue temporaire :
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Exemple de notebook : Lire et écrire dans des fichiers Avro
Le bloc-notes suivant montre comment lire et écrire des fichiers Avro.