讀取和寫入串流 Avro 資料
Apache Avro 是串流世界中常用的資料序列化系統。 典型的解決方案是將數據以 Avro 格式存放於 Apache Kafka 中,元數據則放入 Confluent 架構註冊表,然後使用可以連接到 Kafka 和 Schema Registry 的串流框架來執行查詢。
Azure Databricks 支援 from_avro
和 to_avro
函式, 在 Kafka 中使用 Avro 數據建置串流管線,以及架構登錄中的元數據。 函式 to_avro
以 Avro 格式將數據行編碼為二進位,from_avro
將 Avro 二進位數據譯碼為數據行。 這兩個函式都會將一個數據行轉換成另一個數據行,而輸入/輸出 SQL 數據類型可以是複雜類型或基本類型。
另請參閱<Avro 檔案資料來源>。
手動指定的架構範例
類似於 from_json 和 to_json,您可以搭配任何二進制欄位使用 from_avro
和 to_avro
。 您可以手動指定 Avro 架構,如下列範例所示:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
jsonFormatSchema 範例
您也可以將架構指定為 JSON 字串。 例如,如果 /tmp/user.avsc
是:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
您可以建立 JSON 字串:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
然後使用 from_avro
中的架構:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
架構登錄的範例
如果您的叢集有架構登錄服務,from_avro
可以使用它,因此您不需要手動指定 Avro 架構。
下列範例說明如何讀取 Kafka 的主題「t」,假設索引鍵和值已在 Schema Registry 中作為類型 STRING
和 INT
的主體註冊為「t-key」和「t-value」。
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr).as("key"),
from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr).as("value"))
針對 to_avro
,預設輸出 Avro 架構可能不符合架構登錄服務中目標主體的架構,原因如下:
- Spark SQL 類型與 Avro 架構的對應並非一對一。 請參閱Spark SQL 支援的類型 -> Avro 轉換。
- 如果轉換的輸出 Avro 架構是記錄類型,則記錄名稱會
topLevelRecord
,而且預設沒有命名空間。
如果 to_avro
的預設輸出架構符合目標主體的架構,您可以執行下列動作:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
否則,您必須在 to_avro
函式中提供目標主體的架構:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
向外部 Confluent 架構登錄進行驗證
在 Databricks Runtime 12.2 LTS 和更新版本中,您可以向外部 Confluent 架構登錄進行驗證。 下列範例示範如何設定架構登錄選項,以包含驗證認證和 API 金鑰。
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("key"),
from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
在 Unity Catalog 卷中使用信任庫和密鑰庫檔案
在 Databricks Runtime 14.3 LTS 和更新版本中,您可以使用 Unity 目錄磁碟區中的信任存放區和密鑰存放區檔案向 Confluent 架構登錄進行驗證。 使用下列語法更新上一個範例 中的組態:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
搭配使用架構演進模式與 from_avro
Databricks Runtime 14.2 和更新版本中,您可以使用架構演進模式搭配 from_avro
。 啟用架構演進模式會導致作業在偵測架構演進之後擲回 UnknownFieldException
。 Databricks 建議設定具有架構演進模式的作業,以在工作失敗時自動重新啟動。 請參閱結構化串流的生產考量。
如果您預期源數據的架構會隨著時間演進並擷取數據源中的所有欄位,架構演進會很有用。 如果您的查詢已經明確指定數據源中要查詢的欄位,則不論架構演進為何,都會忽略新增的欄位。
使用 [avroSchemaEvolutionMode
] 選項來啟用架構演進。 下表描述架構演進模式的選項:
選項 | 行為 |
---|---|
none |
Default。 忽略架構演進,作業會繼續。 |
restart |
偵測架構演進時擲回 UnknownFieldException 。 需要重新啟動工作。 |
注意
您可以在串流工作之間變更此組態,並重複使用相同的檢查點。 停用架構演進可能會導致欄位被刪除。
設定剖析模式
當架構演進模式被停用時,您可以設定解析模式,以判斷是否要失敗或輸出 Null 記錄,假如架構以非回溯相容的方式演進。 使用預設設定時,from_avro
會因為偵測到不相容的架構變更而失敗。
使用 mode
選項以指定剖析模式。 下表描述剖析模式的選項:
選項 | 行為 |
---|---|
FAILFAST |
Default。 剖析錯誤會擲回 SparkException ,且 errorClass 為 MALFORMED_AVRO_MESSAGE 。 |
PERMISSIVE |
系統會略過剖析錯誤,並發出 Null 記錄。 |
注意
啟用架構演進后,FAILFAST
只有在記錄損毀時才會擲回例外狀況。
使用架構演進和設定剖析模式的範例
下列範例示範如何啟用架構演進,並使用 Confluent 架構登錄指定 FAILFAST
剖析模式:
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
data = $"key",
subject = "t-key",
schemaRegistryAddress = schemaRegistryAddr,
options = schemaRegistryOptions.asJava).as("key"))
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)