다음을 통해 공유


스트리밍 Avro 데이터 읽기 및 쓰기

Apache Avro는 스트리밍에서 일반적으로 사용되는 데이터 serialization 시스템입니다. 일반적인 솔루션은 Apache Kafka에서 Avro 형식으로 데이터를 배치하고, Confluent Schema 레지스트리메타데이터를 입력한 다음, Kafka 및 Schema 레지스트리에 연결하는 스트리밍 프레임워크를 사용하여 쿼리를 실행하는 것입니다.

Azure Databricks는 Kafka의 Avro 데이터와 Schema 레지스트리의 메타데이터를 포함한 스트리밍 파이프라인을 빌드하기 위해 from_avroto_avro 함수를 지원합니다. 이 함수 to_avro는 column을 Avro 형식의 이진으로 인코딩하고, Avro 이진 데이터를 column로 디코딩하기 위한 from_avro입니다. 두 함수 모두 하나의 column를 다른 column로 변환하며, 입력/출력 SQL 데이터 유형은 복합 유형 또는 기본 유형일 수 있습니다.

참고 항목

from_avroto_avro 함수에는 다음과 같은 특성이 있습니다.

  • Python, Scala 및 Java에서 사용할 수 있습니다.
  • 일괄 처리 및 스트리밍 쿼리에서 SQL 함수로 전달할 수 있습니다.

Avro 파일 데이터 원본도 참조하세요.

수동으로 지정한 schema 예제

from_jsonto_json처럼, 마찬가지로 이진 column를 from_avroto_avro와 함께 사용할 수 있습니다. 다음 예제와 같이 Avro schema 수동으로 지정할 수 있습니다.

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

jsonFormatSchema 예제

schema JSON 문자열로 지정할 수도 있습니다. 예를 들어 /tmp/user.avsc가 다음과 같고

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

다음과 같이 JSON 문자열을 만들 수 있습니다.

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

그런 다음 from_avro에서 schema을 사용합니다.

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Schema 레지스트리를 사용하여 예제

클러스터에 Schema 레지스트리 서비스가 있는 경우 from_avro 이를 사용하여 Avro schema 수동으로 지정할 필요가 없도록 할 수 있습니다.

다음 예제에서는 키와 값이 STRINGINT형식의 주체 "t-key" 및 "t-value"로 Schema 레지스트리에 이미 등록되어 있다고 가정하여 Kafka 토픽 "t"를 읽는 방법을 보여 줍니다.

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

to_avro의 경우 다음과 같은 이유로 기본 출력 Avro schema이 Schema 레지스트리 서비스에서 대상 주체의 schema와 일치하지 않을 수 있습니다.

  • Spark SQL 유형에서 Avro schema로의 매핑은 일대일이 아닙니다. Spark SQL에 지원되는 유형 -> Avro 변환을 참조하세요.
  • 변환된 출력 Avro schema이 레코드 형식인 경우, 레코드 이름은 topLevelRecord이며 기본적으로 네임스페이스가 지정되지 않습니다.

대상 주체의 schema가 to_avro의 기본 출력 schema과 일치하는 경우, 다음을 수행할 수 있습니다.

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

그렇지 않으면, to_avro 함수에서 대상 객체의 schema을 반드시 제공해야 합니다.

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

외부 Confluent Schema 레지스트리에 대한 인증 수행

Databricks Runtime 12.2 LTS 이상에서는 외부 Confluent Schema 레지스트리에 인증할 수 있습니다. 다음 예제에서는 인증 credentials 및 API 키를 포함하도록 schema 레지스트리 옵션을 구성하는 방법을 보여 줍니다.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options,
      jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Unity에서 Catalogvolumes 신뢰 저장소 및 키 저장소 파일 사용

Databricks Runtime 14.3 LTS 이상에서는 truststore 파일 및 키 저장소 파일을 Unity Catalogvolumes에서 사용하여 Confluent Schema 레지스트리에 인증할 수 있습니다. 다음 구문을 사용해 이전 예제의 Update 구성을.

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

schema 진화 모드를 from_avro와 함께 사용

Databricks Runtime 14.2 이상에서는 schema 진화 모드를 from_avro과 함께 사용할 수 있습니다. schema 진화 모드를 사용하도록 설정하면 schema 진화를 감지한 후 작업이 UnknownFieldException 예외를 발생시킵니다. Databricks는 작업 실패 시 자동으로 다시 시작하도록 schema 진화 모드로 작업을 구성하는 것이 좋습니다. 구조적 스트리밍에 대한 프로덕션 고려 사항을 참조하세요.

Schema 발전은 원본 데이터의 schema이 시간이 지남에 따라 발전하고 데이터 원본에서 모든 필드를 포함할 것으로 예상하는 경우에 유용합니다. 쿼리가 데이터 원본에서 쿼리할 필드를 명시적으로 지정한 경우 schema 진화와 관계없이 추가된 필드는 무시됩니다.

avroSchemaEvolutionMode 옵션을 사용하여 schema 진화를 활성화합니다. 다음 table은 schema 진화 모드의 옵션을 설명합니다.

옵션 동작
none Default입니다. schema 진화를 무시하며 작업이 계속됩니다.
restart schema 진화를 감지하면 UnknownFieldException을 발생시킵니다. 작업을 다시 시작해야 합니다.

참고 항목

스트리밍 작업 간에 이 구성을 변경하고 동일한 검사점을 다시 사용할 수 있습니다. schema 진화를 사용하지 않도록 설정하면 columns삭제될 수 있습니다.

구문 분석 모드 구성

구문 모드를 구성하여 schema 진화 모드가 비활성화되고 schema이 이전 버전과 호환되지 않는 방식으로 발전할 때 null 레코드를 내보내거나 실패할지를 결정할 수 있습니다. 기본 설정을 사용하면 호환되지 않는 schema 변경 내용이 발견되면 from_avro이 실패합니다.

mode 옵션을 사용하여 구문 분석 모드를 지정합니다. 다음은 구문 분석 모드 옵션을 설명합니다: table

옵션 동작
FAILFAST Default입니다. 구문 분석 오류로 인해 SparkExceptionerrorClassMALFORMED_AVRO_MESSAGE가 발생합니다.
PERMISSIVE 구문 분석 오류가 무시되고 null 레코드가 내보내집니다.

참고 항목

schema 진화 기능을 활성화하면 FAILFAST은 레코드가 손상된 경우에만 예외를 발생시킵니다.

schema 진화 및 구문 분석 모드 설정 사용 예제

다음 예제에서는 schema 진화를 사용하도록 설정하고 Confluent Schema 레지스트리를 사용하여 FAILFAST 구문 분석 모드를 지정하는 방법을 보여 줍니다.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)