File Avro
Apache Avro è un sistema di serializzazione dei dati. Avro fornisce:
- Strutture dei dati arricchiti.
- Formato di dati binario compatto e veloce.
- Un file contenitore per archiviare i dati persistenti.
- Remote procedure call (RPC).
- Integrazione semplice con linguaggi dinamici. La generazione del codice non è necessaria per leggere o scrivere file di dati né per usare o implementare protocolli RPC. Generazione di codice come ottimizzazione facoltativa: conviene implementarla solo per i linguaggi tipizzati in modo statico.
L'origine dati Avro supporta:
- Conversione dello schema: conversione automatica tra i record APACHE Spark SQL e Avro.
- Partizionamento: leggere e scrivere facilmente dati partizionati senza alcuna configurazione aggiuntiva.
- Compressione: compressione da usare durante la scrittura di Avro su disco. I tipi supportati sono
uncompressed
,snappy
edeflate
. È anche possibile specificare il livello di deflate. - Nomi dei record: nome e spazio dei nomi dei record tramite inserimento di una mappa di parametri con
recordName
erecordNamespace
.
Vedere anche Leggere e scrivere dati Avro in streaming.
Impostazione
È possibile modificare il comportamento di un'origine dati Avro usando vari parametri di configurazione.
Per ignorare i file senza l'estensione .avro
durante la lettura, è possibile impostare il parametro avro.mapred.ignore.inputs.without.extension
nella configurazione di Hadoop. Il valore predefinito è false
.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Per configurare la compressione durante la scrittura, impostare le seguenti proprietà di Spark:
- Codec di compressione:
spark.sql.avro.compression.codec
. I codec supportati sonosnappy
edeflate
. Il codec predefinito èsnappy
. - Se il codec di compressione è
deflate
, è possibile impostare il livello di compressione con :spark.sql.avro.deflate.level
. Il livello predefinito è-1
.
È possibile impostare queste proprietà nella configurazione Spark del cluster o in fase di esecuzione usando spark.conf.set()
. Ad esempio:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Per Databricks Runtime 9.1 LTS e versioni successive, è possibile modificare il comportamento di inferenza dello schema predefinito in Avro scegliendo l'opzione mergeSchema
durante la lettura dei file. Se si imposta mergeSchema
su true
l'inferenza di uno schema avverrà unendo i file di un set di file Avro nella directory di destinazione invece che a partire dallo schema di lettura da un singolo file.
Tipi supportati per Avro -> Conversione spark SQL
Questa libreria supporta la lettura di tutti i tipi Avro. Per abbinare i tipi Avro ai tipi Spark SQL, usa il mapping seguente:
Tipo Avro | Tipo SQL Spark |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
float | FloatType |
double | DoubleType |
bytes | BinaryType |
string | StringType |
registra | StructType |
enum | StringType |
array | ArrayType |
mappa | MapType |
fixed | BinaryType |
union | Vedere Tipi unione. |
Tipi unione
L'origine dati Avro supporta i tipi di lettura union
. Avro presuppone che i tre tipi seguenti siano tipi union
:
union(int, long)
esegue il mapping aLongType
.union(float, double)
esegue il mapping aDoubleType
.union(something, null)
, dovesomething
è qualsiasi tipo Avro supportato. Viene eseguito il mapping allo stesso tipo di Spark SQL disomething
, connullable
impostato sutrue
.
Tutti gli altri tipi union
sono tipi complessi. Eseguono il mapping a StructType
, in cui i nomi dei campi sono member0
, member1
e così via, sulla base dei membri di union
. Questo comportamento è coerente con il comportamento durante la conversione tra Avro e Parquet.
Tipi logici
L'origine dati Avro supporta la lettura dei seguenti tipi logici Avro:
Tipo logico Avro | Tipo Avro | Tipo SQL Spark |
---|---|---|
data | int | DateType |
timestamp-millis | long | TimestampType |
timestamp-micros | long | TimestampType |
decimale | fixed | DecimalType |
decimale | bytes | DecimalType |
Nota
L'origine dati Avro ignora documenti, alias e altre proprietà presenti nel file Avro.
Tipi supportati per Spark SQL -> Conversione Avro
Questa libreria supporta la scrittura di tutti i tipi Spark SQL in Avro. Per la maggior parte dei tipi, il mapping dai tipi Spark ai tipi Avro è diretta (ad esempio IntegerType
viene convertito in int
); di seguito è riportato un elenco dei pochi casi speciali:
Tipo SQL Spark | Tipo Avro | Tipo logico Avro |
---|---|---|
ByteType | int | |
ShortType | int | |
BinaryType | bytes | |
DecimalType | fixed | decimale |
TimestampType | long | timestamp-micros |
DateType | int | data |
È anche possibile specificare l'intero schema Avro di output con l'opzione avroSchema
, in modo che i tipi Spark SQL possano essere convertiti in altri tipi Avro.
Le conversioni seguenti non vengono applicate per impostazione predefinita e richiedono lo schema Avro specificato dall'utente:
Tipo SQL Spark | Tipo Avro | Tipo logico Avro |
---|---|---|
ByteType | fixed | |
StringType | enum | |
DecimalType | bytes | decimale |
TimestampType | long | timestamp-millis |
Esempi
Questi esempi usano il file episodes.avro.
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
Questo esempio illustra uno schema Avro personalizzato:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
Questo esempio illustra le opzioni di compressione Avro:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
Questo esempio illustra i record Avro partizionati:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
Questo esempio illustra il nome del record e lo spazio dei nomi:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
Per eseguire query sui dati avro in SQL, registrare il file di dati come tabella o vista temporanea:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Esempio di notebook: leggere e scrivere file Avro
Il notebook seguente illustra come leggere e scrivere file Avro.