Avro ファイル
Apache Avro はデータ シリアル化システムです。 Avro には次の機能があります。
- 豊富なデータ構造。
- コンパクトで高速なバイナリ データ形式。
- 永続データを格納するコンテナー ファイル。
- リモート プロシージャ呼び出し (RPC)。
- 動的言語との簡単な統合。 データ ファイルの読み取りまたは書き込みや、RPC プロトコルの使用や実装には、コード生成は必要ありません。 オプションの最適化としてのコード生成。静的型指定の言語にのみ実装する価値があります。
Avro データ ソースでは、次の機能がサポートされます。
- スキーマ変換: Apache Spark SQL と Avro レコード間の自動変換。
- パーティション分割: 追加の構成を行わない、パーティション分割されたデータの簡単な読み書き。
- 圧縮: Avro をディスクに書き込むときに使用する圧縮。 サポートされる種類は
uncompressed
、snappy
、deflate
です。 圧縮レベルを指定することもできます。 - レコード名:
recordName
およびrecordNamespace
とともにパラメーターのマップを渡すことによる、レコード名と名前空間。
「ストリーミング Avro データの読み取りと書き込み」も参照してください。
構成
Avro データ ソースの動作は、さまざまな構成パラメーターを使用して変更できます。
読み取り時に .avro
拡張子のないファイルを無視するには、Hadoop 構成で avro.mapred.ignore.inputs.without.extension
パラメーターを設定します。 既定では、 false
です。
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
書き込み時に圧縮を構成するには、次の Spark プロパティを設定します。
- 圧縮コーデック:
spark.sql.avro.compression.codec
。 サポートされているコーデックはsnappy
とdeflate
です。 既定のコーデックはsnappy
です。 - 圧縮コーデックが
deflate
の場合は、spark.sql.avro.deflate.level
で圧縮レベルを設定できます。 既定のレベルは-1
です。
これらのプロパティは、クラスター Spark 構成で、または実行時に、spark.conf.set()
を使用して設定できます。 次に例を示します。
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Databricks Runtime 9.1 LTS 以降では、ファイルを読み取る際に mergeSchema
オプションを指定することで、Avro での既定のスキーマ推論動作を変更できます。 mergeSchema
を true
に設定すると、1 つのファイルから読み取りスキーマを推論するのではなく、ターゲット ディレクトリ内の一連の Avro ファイルからスキーマが推論され、マージされます。
Avro -> Spark SQL 変換でサポートされる型
このライブラリではすべての Avro 型の読み取りをサポートしています。 Avro 型と Spark SQL 型の次のマッピングが使用されます。
Avro 型 | Spark SQL 型 |
---|---|
Boolean | BooleanType |
int | IntegerType |
long | LongType |
float | FloatType |
double | DoubleType |
バイト | BinaryType |
string | StringType |
record | StructType |
enum | StringType |
array | ArrayType |
map | MapType |
修正済み | BinaryType |
union | 「共用体型」を参照してください。 |
共用体型
Avro データ ソースでは、union
型の読み取りがサポートされています。 Avro では、次の 3 つの型が union
型と見なされます。
union(int, long)
はLongType
にマップされます。union(float, double)
はDoubleType
にマップされます。union(something, null)
。ただし、something
はサポートされている Avro 型です。 これは、nullable
がtrue
に設定されたsomething
と同じ Spark SQL 型にマップされます。
その他の union
型はすべて複合型です。 これらは、union
のメンバーに従って、フィールド名が member0
、member1
などの StructType
にマップされます。 これは、Avro と Parquet の間で変換する場合の動作と一致します。
論理型
Avro データ ソースでは、次の Avro 論理型の読み取りがサポートされています。
Avro 論理型 | Avro 型 | Spark SQL 型 |
---|---|---|
date | int | DateType |
timestamp-millis | long | TimestampType |
timestamp-micros | long | TimestampType |
decimal | 修正済み | DecimalType |
decimal | バイト | DecimalType |
注意
Avro データ ソースは、Avro ファイルに存在するドキュメント、エイリアス、その他のプロパティを無視します。
Spark SQL -> Avro 変換でサポートされる型
このライブラリでは、すべての Spark SQL 型の Avro への書き込みをサポートしています。 ほとんどの型で、Spark 型から Avro 型へのマッピングは簡単です (たとえば、IntegerType
は int
に変換されます)。いくつかの特殊な場合の一覧を次に示します。
Spark SQL 型 | Avro 型 | Avro 論理型 |
---|---|---|
ByteType | int | |
ShortType | int | |
BinaryType | バイト | |
DecimalType | 修正済み | decimal |
TimestampType | long | timestamp-micros |
DateType | int | date |
また、オプション avroSchema
で出力 Avro スキーマ全体を指定し、Spark SQL 型を他の Avro 型に変換することもできます。
次の変換は既定では適用されず、ユーザー指定の Avro スキーマが必要になります。
Spark SQL 型 | Avro 型 | Avro 論理型 |
---|---|---|
ByteType | 修正済み | |
StringType | enum | |
DecimalType | バイト | decimal |
TimestampType | long | timestamp-millis |
例
これらの例では、episodes.avro ファイルを使用します。
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
この例では、カスタム Avro スキーマを示します。
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
この例では、Avro 圧縮オプションを示します。
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
この例では、パーティション分割された Avro レコードを示します。
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
この例では、レコード名と名前空間を示します。
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
SQL 内の Avro データをクエリするには、データ ファイルをテーブルまたは一時ビューとして登録します。
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
ノートブックの例: Avro ファイルの読みl込みと書き込み
次のノートブックは、Avro ファイルを読み書きする方法を示しています。