Ficheiro Avro
Apache Avro é um sistema de serialização de dados. Avro fornece:
- Estruturas de dados avançadas.
- Um formato de dados compacto, rápido e binário.
- Um arquivo contêiner, para armazenar dados persistentes.
- Chamada de procedimento remoto (RPC).
- Integração simples com linguagens dinâmicas. A geração de código não é necessária para ler ou gravar arquivos de dados nem para usar ou implementar protocolos RPC. Geração de código como uma otimização opcional, só vale a pena implementar para linguagens tipadas estaticamente.
A fonte de dados Avro suporta:
- Conversão de esquema: Conversão automática entre registros Apache Spark SQL e Avro.
- Particionamento: Facilmente ler e gravar dados particionados sem qualquer configuração extra.
- Compressão: Compactação para usar ao gravar Avro no disco. Os tipos suportados são
uncompressed
,snappy
edeflate
. Você também pode especificar o nível de deflação. - Nomes de registro: nome de registro e namespace passando um mapa de parâmetros com
recordName
erecordNamespace
.
Consulte também Ler e gravar dados Avro de streaming.
Configuração
Você pode alterar o comportamento de uma fonte de dados Avro usando vários parâmetros de configuração.
Para ignorar arquivos sem a extensão durante a .avro
leitura, você pode definir o parâmetro avro.mapred.ignore.inputs.without.extension
na configuração do Hadoop. A predefinição é false
.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Para configurar a compactação durante a gravação, defina as seguintes propriedades do Spark:
- Codec de compressão:
spark.sql.avro.compression.codec
. Os codecs suportados sãosnappy
edeflate
. O codec padrão ésnappy
. - Se o codec de compressão for
deflate
, você pode definir o nível de compactação com:spark.sql.avro.deflate.level
. O nível padrão é-1
.
Você pode definir essas propriedades na configuração do cluster Spark ou em tempo de execução usando spark.conf.set()
. Por exemplo:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Para o Databricks Runtime 9.1 LTS e superior, você pode alterar o comportamento de inferência de esquema padrão no Avro fornecendo a mergeSchema
opção ao ler arquivos. Definir mergeSchema
como true
irá inferir um esquema de um conjunto de arquivos Avro no diretório de destino e mesclá-los em vez de inferir o esquema de leitura a partir de um único arquivo.
Tipos suportados para conversão Avro -> Spark SQL
Esta biblioteca suporta a leitura de todos os tipos Avro. Ele usa o seguinte mapeamento de tipos Avro para tipos Spark SQL:
Tipo Avro | Tipo Spark SQL |
---|---|
boolean | BooleanType |
número inteiro | Tipo inteiro |
long | Tipo Longo |
flutuante | Tipo de flutuação |
duplo | Tipo Duplo |
bytes | BinaryType |
string | StringType |
recorde | Tipo de estrutura |
enumeração | StringType |
matriz | Tipo de matriz |
map | Tipo de mapa |
fixo | BinaryType |
união | Consulte Tipos de União. |
Tipos da União
A fonte de dados Avro suporta tipos de leitura union
. A Avro considera os seguintes três tipos como tipos union
:
union(int, long)
mapeia paraLongType
.union(float, double)
mapeia paraDoubleType
.union(something, null)
, ondesomething
é qualquer tipo Avro suportado. Isso mapeia para o mesmo tipo de SQL do Spark que o desomething
, comnullable
definido comotrue
.
Todos os outros union
tipos são tipos complexos. Eles mapeiam para StructType
onde os nomes de campo estão member0
, member1
, e assim por diante, de acordo com os union
membros do . Isso é consistente com o comportamento ao converter entre Avro e Parquet.
Tipos lógicos
A fonte de dados Avro suporta a leitura dos seguintes tipos lógicos Avro:
Tipo lógico Avro | Tipo Avro | Tipo Spark SQL |
---|---|---|
data | número inteiro | Tipo de Data |
timestamp-millis | long | Tipo de carimbo de data/hora |
timestamp-micros | long | Tipo de carimbo de data/hora |
decimal | fixo | Tipo decimal |
decimal | bytes | Tipo decimal |
Nota
A fonte de dados Avro ignora documentos, aliases e outras propriedades presentes no arquivo Avro.
Tipos suportados para Spark SQL -> conversão Avro
Esta biblioteca suporta a escrita de todos os tipos de Spark SQL no Avro. Para a maioria dos tipos, o mapeamento de tipos Spark para tipos Avro é simples (por exemplo IntegerType
, é convertido em int
); a seguir está uma lista dos poucos casos especiais:
Tipo Spark SQL | Tipo Avro | Tipo lógico Avro |
---|---|---|
Tipo de Byte | número inteiro | |
Tipo curto | número inteiro | |
BinaryType | bytes | |
Tipo decimal | fixo | decimal |
Tipo de carimbo de data/hora | long | timestamp-micros |
Tipo de Data | número inteiro | data |
Você também pode especificar todo o esquema Avro de saída com a opção avroSchema
, para que os tipos Spark SQL possam ser convertidos em outros tipos Avro.
As conversões a seguir não são aplicadas por padrão e exigem o esquema Avro especificado pelo usuário:
Tipo Spark SQL | Tipo Avro | Tipo lógico Avro |
---|---|---|
Tipo de Byte | fixo | |
StringType | enumeração | |
Tipo decimal | bytes | decimal |
Tipo de carimbo de data/hora | long | timestamp-millis |
Exemplos
Esses exemplos usam o arquivo episodes.avro .
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
Este exemplo demonstra um esquema Avro personalizado:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
Este exemplo demonstra as opções de compactação Avro:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
Este exemplo demonstra registros Avro particionados:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
Este exemplo demonstra o nome do registro e o namespace:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
Para consultar dados Avro em SQL, registre o arquivo de dados como uma tabela ou exibição temporária:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Exemplo de bloco de notas: ler e escrever ficheiros Avro
O bloco de anotações a seguir demonstra como ler e gravar arquivos Avro.