共用方式為


讀取和寫入串流 Avro 資料

Apache Avro 是串流世界中常用的資料序列化系統。 傳統的解決方案是將資料以 Avro 格式放入 Apache Kafka、將中繼資料放入 Confluent 結構描述登錄,然後使用連線至 Kafka 和結構描述登錄的串流架構執行查詢。

Azure Databricks 支援 from_avroto_avro 函數,以 Kafka 中的 Avro 資料和結構描述登錄中的中繼資料建置串流管線。 函數 to_avro 會將資料行編碼為 Avro 格式的二進位,而 from_avro 會將 Avro 二進位資料解碼為資料行。 這兩個函數都會將一個資料行轉換成另一個資料行,而輸入/輸出 SQL 資料類型可以是複雜類型或基本類型。

注意

from_avroto_avro 函數:

  • 可在 Python、Scala 和 Java 中使用。
  • 可以傳遞至批次和串流查詢中的 SQL 函數。

另請參閱<Avro 檔案資料來源>。

手動指定的結構描述範例

類似於 from_jsonto_json,您可以使用 from_avroto_avro 搭配任何二進位資料行。 您可以手動指定 Avro 結構描述,如下列範例所示:

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

jsonFormatSchema 範例

您也可以將結構描述指定為 JSON 字串。 例如,如果 /tmp/user.avsc 是:

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

您可以建立 JSON 字串:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

然後使用 from_avro 中的結構描述:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

結構描述登錄的範例

如果您的叢集有結構描述登錄服務,from_avro 可以使用它,因此您不需要手動指定 Avro 結構描述。

下列範例示範如何讀取 Kafka 主題「t」,假設結構描述登錄中已將金鑰和值登錄為類型 STRINGINT 的主體「t-key」和「t-value」:

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

針對 to_avro,預設輸出 Avro 結構描述可能不符合結構描述登錄服務中目標主體的結構描述,原因如下:

  • 從 Spark SQL 類型到 Avro 結構描述的對應不是一對一。 請參閱Spark SQL 支援的類型 -> Avro 轉換
  • 如果轉換的輸出 Avro 結構描述是記錄類型,則記錄名稱為 topLevelRecord,且預設沒有命名空間。

如果 to_avro 的預設輸出結構描述符合目標主體的結構描述,您可以執行下列動作:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

否則,您必須在 to_avro 函數中提供目標主體的結構描述:

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

對外部 Confluent 結構描述登錄進行驗證

在 Databricks Runtime 12.2 LTS 和更新版本中,您可以對外部 Confluent 結構描述登錄進行驗證。 下列範例示範如何設定結構描述登錄選項,以包含驗證認證和 API 金鑰。

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options,
      jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

在 Unity Catalog 磁碟區中使用信任存放區和金鑰儲存區檔案

在 Databricks Runtime 14.3 LTS 和更新版本中,您可以使用 Unity Catalog 磁碟區中的信任存放區和金鑰儲存區檔案,對 Confluent 結構描述登錄進行驗證。 使用下列語法可更新上一個範例中的組態:

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

使用結構描述演進模式搭配 from_avro

在 Databricks Runtime 14.2 和更新版本中,您可以使用結構描述演進模式搭配 from_avro。 開啟結構描述演進模式會導致工作在偵測結構描述演進之後擲回 UnknownFieldException。 Databricks 建議設定具有結構描述演進模式的工作,以在工作失敗時自動重新啟動。 請參閱結構化串流的生產考量

如果您預期來源資料的結構描述會隨著時間演進並擷取資料來源中的所有欄位,結構描述演進相當實用。 如果您的查詢已經明確指定資料來源中要查詢的欄位,則不論結構描述演進為何,都會忽略新增的欄位。

使用 avroSchemaEvolutionMode 選項可啟用結構描述演進。 下表說明結構描述演進模式的選項:

選項 行為
none Default。 略過結構描述演進,且工作會繼續。
restart 偵測結構描述演進時擲回 UnknownFieldException。 需要重新啟動工作。

注意

您可以在串流工作之間變更此組態,並重複使用相同的檢查點。 停用結構描述演進可能會導致捨棄資料行。

設定剖析模式

您可以設定剖析模式,以判斷是否要在停用結構描述演進模式時失敗或發出 Null 記錄,而結構描述會以非回溯相容的方式演進。 使用預設設定時,from_avro 會在發現不相容的結構描述變更時失敗。

使用 mode 選項以指定剖析模式。 下表說明剖析模式的選項:

選項 行為
FAILFAST Default。 剖析錯誤會擲回 SparkException,且 errorClassMALFORMED_AVRO_MESSAGE
PERMISSIVE 系統會略過剖析錯誤,並發出 Null 記錄。

注意

啟用結構描述演進時,FAILFAST 只會在記錄損毀時擲回例外。

使用結構描述演進和設定剖析模式的範例

下列範例示範如何啟用結構描述演進,並使用 Confluent 結構描述登錄指定 FAILFAST 剖析模式:

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)