Avro 파일
Apache Avro는 데이터 직렬화 시스템입니다. Avro는 다음을 제공합니다.
- 풍부한 데이터 구조.
- 작고 빠른 이진 데이터 형식입니다.
- 영구 데이터를 저장하기 위한 컨테이너 파일입니다.
- RPC(원격 프로시저 호출)입니다.
- 동적 언어와의 간단한 통합. 데이터 파일을 읽거나 쓰거나 RPC 프로토콜을 사용하거나 구현하기 위해 코드 생성이 필요하지 않습니다. 선택적인 최적화로서의 코드 생성은 정적으로 유형이 지정된 언어에 대해서만 구현할 가치가 있습니다.
Avro 데이터 원본는 다음을 지원합니다.
- 스키마 변환: Apache Spark SQL과 Avro 레코드 간의 자동 변환.
- 분할: 추가 구성 없이 파티션된 데이터를 쉽게 읽고 쓸 수 있습니다.
- 압축: Avro를 디스크에 쓸 때 사용할 압축입니다. 지원되는 형식은
uncompressed
,snappy
및deflate
입니다. 수축 수준을 지정할 수도 있습니다. - 레코드 이름:
recordName
및recordNamespace
를 사용하여 매개 변수 맵을 전달하여 이름 및 네임스페이스를 기록합니다.
스트리밍 Avro 데이터 읽기 및 쓰기도 참조하세요.
구성
다양한 구성 매개 변수를 사용하여 Avro 데이터 원본의 동작을 변경할 수 있습니다.
읽을 때 .avro
확장자가 없는 파일을 무시하려면 Hadoop 구성에서 매개 변수 avro.mapred.ignore.inputs.without.extension
를 설정할 수 있습니다. 기본값은 false
입니다.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
작성 시 압축을 구성하려면 다음 Spark 속성을 설정합니다.
- 압축 코덱:
spark.sql.avro.compression.codec
. 지원되는 코덱은snappy
및deflate
입니다. 기본 코덱은snappy
입니다. - 압축 코덱이
deflate
인 경우spark.sql.avro.deflate.level
을 사용하여 압축 수준을 설정할 수 있습니다. 기본 수준은-1
입니다.
이러한 속성은 클러스터 Spark 구성에서 또는 런타임에 spark.conf.set()
을 사용하여 설정할 수 있습니다. 예시:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Databricks Runtime 9.1 LTS 이상의 경우 파일을 읽을 때 mergeSchema
옵션을 제공하여 Avro의 기본 스키마 유추 동작을 변경할 수 있습니다. mergeSchema
를 true
로 설정하면 단일 파일에서 읽기 스키마를 유추하는 대신 대상 디렉터리의 Avro 파일 집합에서 스키마를 유추하고 병합합니다.
Avro -> Spark SQL 변환에 지원되는 유형
이 라이브러리는 모든 Avro 유형 읽기를 지원합니다. Avro 유형에서 Spark SQL 형식으로의 다음 매핑을 사용합니다.
Avro 유형 | Spark SQL 형식 |
---|---|
부울 값 | 불리안타입 |
int | 인터저타입 |
long | 롱타입 |
float | 플롯타입 |
double | 더블타입 |
bytes | BinaryType |
string | 스트링타입 |
녹화 | StructType |
enum | 스트링타입 |
배열 | ArrayType |
map | 맵타입 |
fixed | BinaryType |
union | 공용 구조체 유형을 참조하세요. |
공용 구조체 유형
Avro 데이터 원본은 읽기 union
유형을 지원합니다. Avro는 다음 세 가지 유형을 union
유형으로 간주합니다.
union(int, long)
는LongType
에 매핑됩니다.union(float, double)
는DoubleType
에 매핑됩니다.union(something, null)
, 여기서something
은 지원되는 Avro 유형입니다. 이는something
과 동일한 Spark SQL 형식에 매핑되며nullable
은true
로 설정됩니다.
다른 모든 union
유형은 복합 형식입니다. union
의 멤버에 따라 필드 이름이 member0
, member1
등인 StructType
에 매핑됩니다. 이는 Avro와 Parquet 간에 변환할 때의 동작과 일치합니다.
논리적 형식
Avro 데이터 원본은 다음 Avro 논리 유형 읽기를 지원합니다.
Avro 논리 유형 | Avro 유형 | Spark SQL 형식 |
---|---|---|
date | int | 데이트타입 |
timestamp-millis | long | 타임스탬프타입 |
timestamp-micros | long | 타임스탬프타입 |
decimal | fixed | 데시말타입 |
decimal | bytes | 데시말타입 |
참고 항목
Avro 데이터 원본은 Avro 파일에 있는 문서, 별칭 및 기타 속성을 무시합니다.
Spark SQL에 지원되는 유형 -> Avro 변환
이 라이브러리는 Avro에 모든 Spark SQL 형식 쓰기를 지원합니다. 대부분의 유형에서 Spark 유형에서 Avro 유형으로의 매핑은 간단합니다(예: IntegerType
은 int
로 변환됨). 다음은 몇 가지 특별한 경우의 목록입니다.
Spark SQL 형식 | Avro 유형 | Avro 논리 유형 |
---|---|---|
바이트타입 | int | |
쇼트타입 | int | |
BinaryType | bytes | |
데시말타입 | fixed | decimal |
타임스탬프타입 | long | timestamp-micros |
데이트타입 | int | date |
Spark SQL 형식을 다른 Avro 유형으로 변환할 수 있도록 avroSchema
옵션을 사용하여 전체 출력 Avro 스키마를 지정할 수도 있습니다.
다음 변환은 기본적으로 적용되지 않으며 사용자 지정 Avro 스키마가 필요합니다.
Spark SQL 형식 | Avro 유형 | Avro 논리 유형 |
---|---|---|
바이트타입 | fixed | |
스트링타입 | enum | |
데시말타입 | bytes | decimal |
타임스탬프타입 | long | timestamp-millis |
예제
이 예에서는 episodes.avro 파일을 사용합니다.
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
이 예에서는 사용자 지정 Avro 스키마를 보여 줍니다.
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
이 예에서는 Avro 압축 옵션을 보여 줍니다.
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
이 예는 분할된 Avro 레코드를 보여 줍니다.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
이 예에서는 레코드 이름과 네임스페이스를 보여 줍니다.
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
SQL에서 Avro 데이터를 쿼리하려면 데이터 파일을 테이블 또는 임시 보기로 등록합니다.
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Notebook 예제: Avro 파일 읽기 및 쓰기
다음 Notebook은 Avro 파일을 읽고 쓰는 방법을 보여 줍니다.