Partilhar via


Ficheiro Avro

Apache Avro é um sistema de serialização de dados. Avro fornece:

  • Estruturas de dados avançadas.
  • Um formato de dados compacto, rápido e binário.
  • Um arquivo contêiner, para armazenar dados persistentes.
  • Chamada de procedimento remoto (RPC).
  • Integração simples com linguagens dinâmicas. A geração de código não é necessária para ler ou gravar arquivos de dados nem para usar ou implementar protocolos RPC. Geração de código como uma otimização opcional, só vale a pena implementar para linguagens tipadas estaticamente.

A fonte de dados Avro suporta:

  • Conversão de esquema: Conversão automática entre registros Apache Spark SQL e Avro.
  • Particionamento: Facilmente ler e gravar dados particionados sem qualquer configuração extra.
  • Compressão: Compactação para usar ao gravar Avro no disco. Os tipos suportados são uncompressed, snappye deflate. Você também pode especificar o nível de deflação.
  • Nomes de registro: nome de registro e namespace passando um mapa de parâmetros com recordName e recordNamespace.

Consulte também Ler e gravar dados Avro de streaming.

Configuração

Você pode alterar o comportamento de uma fonte de dados Avro usando vários parâmetros de configuração.

Para ignorar arquivos sem a extensão durante a .avro leitura, você pode definir o parâmetro avro.mapred.ignore.inputs.without.extension na configuração do Hadoop. A predefinição é false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Para configurar a compactação durante a gravação, defina as seguintes propriedades do Spark:

  • Codec de compressão: spark.sql.avro.compression.codec. Os codecs suportados são snappy e deflate. O codec padrão é snappy.
  • Se o codec de compressão for deflate, você pode definir o nível de compactação com: spark.sql.avro.deflate.level. O nível padrão é -1.

Você pode definir essas propriedades na configuração do cluster Spark ou em tempo de execução usando spark.conf.set(). Por exemplo:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Para o Databricks Runtime 9.1 LTS e superior, você pode alterar o comportamento de inferência de esquema padrão no Avro fornecendo a mergeSchema opção ao ler arquivos. Definir mergeSchema como true irá inferir um esquema de um conjunto de arquivos Avro no diretório de destino e mesclá-los em vez de inferir o esquema de leitura a partir de um único arquivo.

Tipos suportados para conversão Avro -> Spark SQL

Esta biblioteca suporta a leitura de todos os tipos Avro. Ele usa o seguinte mapeamento de tipos Avro para tipos Spark SQL:

Tipo Avro Tipo Spark SQL
boolean BooleanType
número inteiro Tipo inteiro
long Tipo Longo
flutuante Tipo de flutuação
duplo Tipo Duplo
bytes BinaryType
string StringType
recorde Tipo de estrutura
enumeração StringType
matriz Tipo de matriz
map Tipo de mapa
fixo BinaryType
união Consulte Tipos de União.

Tipos da União

A fonte de dados Avro suporta tipos de leitura union . A Avro considera os seguintes três tipos como tipos union :

  • union(int, long) mapeia para LongType.
  • union(float, double) mapeia para DoubleType.
  • union(something, null), onde something é qualquer tipo Avro suportado. Isso mapeia para o mesmo tipo de SQL do Spark que o de something, com nullable definido como true.

Todos os outros union tipos são tipos complexos. Eles mapeiam para StructType onde os nomes de campo estão member0, member1, e assim por diante, de acordo com os unionmembros do . Isso é consistente com o comportamento ao converter entre Avro e Parquet.

Tipos lógicos

A fonte de dados Avro suporta a leitura dos seguintes tipos lógicos Avro:

Tipo lógico Avro Tipo Avro Tipo Spark SQL
data número inteiro Tipo de Data
timestamp-millis long Tipo de carimbo de data/hora
timestamp-micros long Tipo de carimbo de data/hora
decimal fixo Tipo decimal
decimal bytes Tipo decimal

Nota

A fonte de dados Avro ignora documentos, aliases e outras propriedades presentes no arquivo Avro.

Tipos suportados para Spark SQL -> conversão Avro

Esta biblioteca suporta a escrita de todos os tipos de Spark SQL no Avro. Para a maioria dos tipos, o mapeamento de tipos Spark para tipos Avro é simples (por exemplo IntegerType , é convertido em int); a seguir está uma lista dos poucos casos especiais:

Tipo Spark SQL Tipo Avro Tipo lógico Avro
Tipo de Byte número inteiro
Tipo curto número inteiro
BinaryType bytes
Tipo decimal fixo decimal
Tipo de carimbo de data/hora long timestamp-micros
Tipo de Data número inteiro data

Você também pode especificar todo o esquema Avro de saída com a opção avroSchema, para que os tipos Spark SQL possam ser convertidos em outros tipos Avro. As conversões a seguir não são aplicadas por padrão e exigem o esquema Avro especificado pelo usuário:

Tipo Spark SQL Tipo Avro Tipo lógico Avro
Tipo de Byte fixo
StringType enumeração
Tipo decimal bytes decimal
Tipo de carimbo de data/hora long timestamp-millis

Exemplos

Esses exemplos usam o arquivo episodes.avro .

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

Este exemplo demonstra um esquema Avro personalizado:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

Este exemplo demonstra as opções de compactação Avro:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

Este exemplo demonstra registros Avro particionados:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

Este exemplo demonstra o nome do registro e o namespace:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Para consultar dados Avro em SQL, registre o arquivo de dados como uma tabela ou exibição temporária:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Exemplo de bloco de notas: ler e escrever ficheiros Avro

O bloco de anotações a seguir demonstra como ler e gravar arquivos Avro.

Ler e escrever arquivos Avro bloco de anotações

Obter o bloco de notas