Avro 檔案
Apache Avro 是資料序列化系統。 Avro 提供:
- 豐富的資料結構。
- 壓縮、快速的二進位資料格式。
- 用來儲存持續性資料的容器檔案。
- 遠端程序呼叫 (RPC)。
- 與動態語言的簡單整合。 不需要產生代碼即可讀取或寫入資料檔案,也不需要使用或實施 RPC 通訊協定。 代碼產生為選擇性最佳化,僅值得實施於靜態類型語言。
Avro資料來源支援:
- 結構描述轉換:Apache Spark SQL 與 Avro 記錄之間的自動轉換。
- 資料分割:輕鬆讀取和寫入資料分割資料,而不需要任何額外的設定。
- 壓縮:將 Avro 寫入磁碟時要使用的壓縮。 支援的類型為
uncompressed
、snappy
和deflate
。 您也可以指定壓抑的層級。 - 記錄名稱:使用
recordName
和recordNamespace
傳遞參數的對應來記錄名稱和命名空間。
也請參閱讀取和寫入串流 Avro 資料。
組態
您可以使用各種組態參數來變更 Avro 資料來源的行為。
若要在讀取時略過沒有 .avro
延伸項目的檔案,您可以在 Hadoop 組態中設定參數 avro.mapred.ignore.inputs.without.extension
。 預設值為 false
。
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
若要在寫入時設定壓縮,請設定下列 Spark 屬性:
- 壓縮編解碼器:
spark.sql.avro.compression.codec
。 支援的編解碼器為snappy
和deflate
。 預設編解碼器為snappy
。 - 如果壓縮編解碼器是
deflate
,您可以使用下列來設定壓縮層級:spark.sql.avro.deflate.level
。 預設層級是-1
。
您可以在叢集 Spark 組態或執行階段使用 spark.conf.set()
來設定這些屬性。 例如:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
針對 Databricks Runtime 9.1 LTS 和更新版本,您可以在讀取檔案時在 Avro 中提供mergeSchema
選項,以變更 Avro 中的預設結構描述推斷行為。 將 mergeSchema
設定為 true
會從目標目錄中的一組 Avro 檔案推斷結構描述,並加以合併,而不是從單一檔案推斷讀取結構描述。
Avro 支援的類型 -> Spark SQL 轉換
此程式庫支援讀取所有 Avro 類型。 它會使用下列從 Avro 類型到 Spark SQL 類型的對應:
Avro 類型 | Spark SQL 類型 |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
float | FloatType |
double | DoubleType |
bytes | BinaryType |
字串 | StringType |
記錄 | StructType |
enum | StringType |
陣列 | ArrayType |
map | MapType |
fixed | BinaryType |
union | 請參閱等位型別。 |
等位型別
Avro 資料來源支援讀取 union
類型。 Avro 會將下列三種類型視為 union
類型:
union(int, long)
對應至LongType
。union(float, double)
對應至DoubleType
。union(something, null)
,其中something
是任何支援的 Avro 類型。 這會對應至與something
相同的 Spark SQL 類型,並將nullable
設定為true
。
所有其他的 union
類型都是複雜類型。 它們會對應至 StructType
,而其欄位名稱為 member0
、 member1
等位置,且將依照 union
的成員進行對應。 這與 Avro 與 Parquet 之間轉換時的行為一致。
邏輯類型
Avro 資料來源支援讀取下列 Avro 邏輯類型:
Avro 邏輯類型 | Avro 類型 | Spark SQL 類型 |
---|---|---|
date | int | DateType |
timestamp-millis | long | TimestampType |
timestamp-micros | long | TimestampType |
decimal | fixed | DecimalType |
decimal | bytes | DecimalType |
注意
Avro 資料來源略過 Avro 檔案中存在的文件、別名和其他屬性。
Spark SQL 支援的類型 -> Avro 轉換
此程式庫支援將所有 Spark SQL類型寫入 Avro。 對大部分類型而言,從 Spark 類型到 Avro 類型的對應很簡單(例如 IntegerType
會轉換成 int
):下列是少數特殊案例的清單:
Spark SQL 類型 | Avro 類型 | Avro 邏輯類型 |
---|---|---|
ByteType | int | |
ShortType | int | |
BinaryType | bytes | |
DecimalType | fixed | decimal |
TimestampType | long | timestamp-micros |
DateType | int | date |
您也可以使用選項 avroSchema
來指定整個輸出的 Avro 結構描述,讓 Spark SQL 類型可以轉換成其他 Avro 類型。
預設不會套用下列轉換,而且需要使用者指定的 Avro 結構描述:
Spark SQL 類型 | Avro 類型 | Avro 邏輯類型 |
---|---|---|
ByteType | fixed | |
StringType | enum | |
DecimalType | bytes | decimal |
TimestampType | long | timestamp-millis |
範例
這些範例會使用 episodes.avro 檔案。
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
此範例示範自訂的 Avro 結構描述:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
此範例示範 Avro 壓縮選項:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
此範例示範分割的 Avro 記錄:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
此範例示範記錄名稱與命名空間:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
若要在 SQL 中查詢 Avro 資料,請將資料檔案註冊為資料表或暫存檢視:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
筆記本範例:讀取和寫入 Avro 檔案
下列筆記本示範如何讀取和寫入 Avro 檔案。