次の方法で共有


Avro ファイル

Apache Avro はデータ シリアル化システムです。 Avro には次の機能があります。

  • 豊富なデータ構造。
  • コンパクトで高速なバイナリ データ形式。
  • 永続データを格納するコンテナー ファイル。
  • リモート プロシージャ呼び出し (RPC)。
  • 動的言語との簡単な統合。 データ ファイルの読み取りまたは書き込みや、RPC プロトコルの使用や実装には、コード生成は必要ありません。 オプションの最適化としてのコード生成。静的型指定の言語にのみ実装する価値があります。

Avro データ ソースでは、次の機能がサポートされます。

  • スキーマ変換: Apache Spark SQL と Avro レコード間の自動変換。
  • パーティション分割: 追加の構成を行わない、パーティション分割されたデータの簡単な読み書き。
  • 圧縮: Avro をディスクに書き込むときに使用する圧縮。 サポートされる種類は uncompressedsnappydeflate です。 圧縮レベルを指定することもできます。
  • レコード名: recordName および recordNamespace とともにパラメーターのマップを渡すことによる、レコード名と名前空間。

ストリーミング Avro データの読み取りと書き込み」も参照してください。

構成

Avro データ ソースの動作は、さまざまな構成パラメーターを使用して変更できます。

読み取り時に .avro 拡張子のないファイルを無視するには、Hadoop 構成で avro.mapred.ignore.inputs.without.extension パラメーターを設定します。 既定では、 falseです。

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

書き込み時に圧縮を構成するには、次の Spark プロパティを設定します。

  • 圧縮コーデック: spark.sql.avro.compression.codec。 サポートされているコーデックは snappydeflate です。 既定のコーデックは snappy です。
  • 圧縮コーデックが deflate の場合は、spark.sql.avro.deflate.level で圧縮レベルを設定できます。 既定のレベルは -1 です。

これらのプロパティは、クラスター Spark 構成で、または実行時に、spark.conf.set() を使用して設定できます。 次に例を示します。

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Databricks Runtime 9.1 LTS 以降では、ファイルを読み取る際に mergeSchema オプションを指定することで、Avro での既定のスキーマ推論動作を変更できます。 mergeSchematrue に設定すると、1 つのファイルから読み取りスキーマを推論するのではなく、ターゲット ディレクトリ内の一連の Avro ファイルからスキーマが推論され、マージされます。

Avro -> Spark SQL 変換でサポートされる型

このライブラリではすべての Avro 型の読み取りをサポートしています。 Avro 型と Spark SQL 型の次のマッピングが使用されます。

Avro 型 Spark SQL 型
Boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
バイト BinaryType
string StringType
record StructType
enum StringType
array ArrayType
map MapType
修正済み BinaryType
union 共用体型」を参照してください。

共用体型

Avro データ ソースでは、union 型の読み取りがサポートされています。 Avro では、次の 3 つの型が union 型と見なされます。

  • union(int, long)LongType にマップされます。
  • union(float, double)DoubleType にマップされます。
  • union(something, null)。ただし、something はサポートされている Avro 型です。 これは、nullabletrue に設定された something と同じ Spark SQL 型にマップされます。

その他の union 型はすべて複合型です。 これらは、union のメンバーに従って、フィールド名が member0member1 などの StructType にマップされます。 これは、Avro と Parquet の間で変換する場合の動作と一致します。

論理型

Avro データ ソースでは、次の Avro 論理型の読み取りがサポートされています。

Avro 論理型 Avro 型 Spark SQL 型
date int DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
decimal 修正済み DecimalType
decimal バイト DecimalType

注意

Avro データ ソースは、Avro ファイルに存在するドキュメント、エイリアス、その他のプロパティを無視します。

Spark SQL -> Avro 変換でサポートされる型

このライブラリでは、すべての Spark SQL 型の Avro への書き込みをサポートしています。 ほとんどの型で、Spark 型から Avro 型へのマッピングは簡単です (たとえば、IntegerTypeint に変換されます)。いくつかの特殊な場合の一覧を次に示します。

Spark SQL 型 Avro 型 Avro 論理型
ByteType int
ShortType int
BinaryType バイト
DecimalType 修正済み decimal
TimestampType long timestamp-micros
DateType int date

また、オプション avroSchema で出力 Avro スキーマ全体を指定し、Spark SQL 型を他の Avro 型に変換することもできます。 次の変換は既定では適用されず、ユーザー指定の Avro スキーマが必要になります。

Spark SQL 型 Avro 型 Avro 論理型
ByteType 修正済み
StringType enum
DecimalType バイト decimal
TimestampType long timestamp-millis

これらの例では、episodes.avro ファイルを使用します。

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

この例では、カスタム Avro スキーマを示します。

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

この例では、Avro 圧縮オプションを示します。

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

この例では、パーティション分割された Avro レコードを示します。

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

この例では、レコード名と名前空間を示します。

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

SQL 内の Avro データをクエリするには、データ ファイルをテーブルまたは一時ビューとして登録します。

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

ノートブックの例: Avro ファイルの読みl込みと書き込み

次のノートブックは、Avro ファイルを読み書きする方法を示しています。

Avro ファイル ノートブックを読み書きする

ノートブックを入手