Läsa och skriva Avro-direktuppspelningsdata
Apache Avro är ett vanligt data serialiseringssystem i strömningsvärlden. En vanlig lösning är att placera data i Avro-format i Apache Kafka, metadata i Confluent Schema Registryoch sedan köra frågor med ett strömningsramverk som ansluter till både Kafka och Schema Registry.
Azure Databricks stöder funktionerna from_avro
och to_avro
för att skapa strömmande pipelines med Avro-data i Kafka och metadata i Schema Registry. Funktionen to_avro
kodar en column som binär i Avro-format och from_avro
avkodar Avro-binära data till en column. Båda funktionerna transformerar en column till en annan column, och SQL-datatypen för indata/utdata kan vara en komplex typ eller en primitiv typ.
Kommentar
Funktionerna from_avro
och to_avro
:
- Finns i Python, Scala och Java.
- Kan skickas till SQL-funktioner i både batch- och strömningsfrågor.
Manuellt angivet schema exempel
På samma sätt som from_json och to_jsonkan du använda from_avro
och to_avro
med valfri binär column. Du kan ange Avro-schema manuellt, som i följande exempel:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
jsonFormatSchema-exempel
Du kan också ange en schema som en JSON-sträng. Om är till exempel /tmp/user.avsc
:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Du kan skapa en JSON-sträng:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Använd sedan schema i from_avro
:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Exempel med Schema Registry
Om klustret har en Schema Registry-tjänst kan from_avro
arbeta med den så att du inte behöver ange Avro-schema manuellt.
Följande exempel visar läsning av ett Kafka-ämne "t", förutsatt att nyckeln och värdet redan är registrerade i Schema Registry som ämnen "t-key" och "t-value" av typer STRING
och INT
:
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr).as("value"))
För to_avro
kanske standardutdatan Avro schema inte matchar schema för målämnet i Schema Registry-tjänsten av följande skäl:
- Mappningen från Spark SQL-typ till Avro schema är inte en-till-en. Se Typer som stöds för Spark SQL –> Avro-konvertering.
- Om den konverterade utdatatypen Avro schema är av posttyp, är postnamnet
topLevelRecord
och det finns inget namnområde som standard.
Om standardutdata schema för to_avro
matchar målämnets schema kan du göra följande:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Annars måste du ange schema för målämnet i funktionen to_avro
:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Autentisera till ett externt Confluent-Schema-register
I Databricks Runtime 12.2 LTS och senare kan du autentisera till ett externt Confluent-Schema-register. Följande exempel visar hur du konfigurerar dina schema registeralternativ för att inkludera autentisering credentials- och API-nycklar.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Använd säkerhetsarkiv- och nyckelarkivfiler i Unity Catalogvolumes
I Databricks Runtime 14.3 LTS och senare kan du använda säkerhetsarkiv- och nyckelarkivfiler i Unity Catalogvolumes för att autentisera till ett Confluent-Schema-register. Update konfigurationen i föregående exempel med följande syntax:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
Använd schema utvecklingsläge med from_avro
I Databricks Runtime 14.2 och senare kan du använda schema evolutionsläge med from_avro
. Om du aktiverar schema evolutionsläge utlöser jobbet en UnknownFieldException
när schema utveckling har upptäckts. Databricks rekommenderar att du konfigurerar jobb med schema utvecklingsläge för att automatiskt starta om vid aktivitetsfel. Se Produktionsöverväganden för strukturerad direktuppspelning.
Schema utvecklingen är användbar om du förväntar dig att schema av dina källdata ska utvecklas med tiden och integrera alla fält från datakällan. Om dina frågor redan uttryckligen anger vilka fält som ska frågas i datakällan ignoreras tillagda fält oavsett schema utveckling.
Använd alternativet avroSchemaEvolutionMode
för att aktivera schema utveckling. Följande table beskriver alternativen för schema evolutionsläge:
Alternativ | Funktionssätt |
---|---|
none |
Standard. Ignorerar utvecklingen av schema och arbetet fortsätter. |
restart |
Genererar en UnknownFieldException vid identifiering av schema utveckling. Kräver en omstart av jobbet. |
Kommentar
Du kan ändra den här konfigurationen mellan direktuppspelningsjobb och återanvända samma kontrollpunkt. Om du inaktiverar schema-uppdateringen kan det leda till förlorade columns.
Konfigurera parsningsläget
Du kan konfigurera parsningsläget för att avgöra om du vill misslyckas eller generera null-poster när schema evolutionsläge är inaktiverat och schema utvecklas på ett icke-bakåtkompatibelt sätt. Med standardinställningarna misslyckas from_avro
när den observerar inkompatibla schema ändringar.
Använd alternativet mode
för att ange parsningsläge. Följande table beskriver alternativet för parsningsläge:
Alternativ | Funktionssätt |
---|---|
FAILFAST |
Standard. Ett parsningsfel genererar en SparkException med en errorClass av MALFORMED_AVRO_MESSAGE . |
PERMISSIVE |
Ett parsningsfel ignoreras och en nullpost genereras. |
Kommentar
När schema utveckling är aktiverad utlöser FAILFAST
endast undantag om en post är skadad.
Exempel med schema evolutions- och inställningsparseläge
I följande exempel visas hur du aktiverar schema utveckling och anger FAILFAST
parsningsläge med ett Confluent-Schema-register:
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
$"key",
"t-key",
schemaRegistryAddr,
schemaRegistryOptions.asJava).as("key"))
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)