Läsa och skriva Avro-direktuppspelningsdata
Apache Avro är ett vanligt data serialiseringssystem i strömningsvärlden. En vanlig lösning är att placera data i Avro-format i Apache Kafka, metadata i Confluent Schema Registryoch sedan köra frågor med ett strömningsramverk som ansluter till både Kafka och Schema Registry.
Azure Databricks stöder funktionerna from_avro
och to_avro
för att skapa strömmande pipelines med Avro-data i Kafka och metadata i Schema Registry. Funktionen to_avro
kodar en kolumn som binär i Avro-format och from_avro
avkodar binära Avro-data till en kolumn. Båda funktionerna transformerar en kolumn till en annan kolumn, och SQL-datatypen för indata/utdata kan vara en komplex typ eller en primitiv typ.
Kommentar
Funktionerna from_avro
och to_avro
:
- Finns i Python, Scala och Java.
- Kan skickas till SQL-funktioner i både batch- och strömningsfrågor.
Manuellt angivet schemaexempel
På samma sätt som from_json och to_jsonkan du använda from_avro
och to_avro
med valfri binär kolumn. Du kan ange Avro-schemat manuellt, som i följande exempel:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
jsonFormatSchema-exempel
Du kan också ange ett schema som en JSON-sträng. Om är till exempel /tmp/user.avsc
:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Du kan skapa en JSON-sträng:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Använd sedan schemat i from_avro
:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Exempel med Schema Registry
Om klustret har en Schema Registry-tjänst kan from_avro
arbeta med den så att du inte behöver ange Avro-schemat manuellt.
I följande exempel visas läsning av ett Kafka-ämne "t", förutsatt att nyckeln och värdet redan är registrerade i Schema Registry som ämnen "t-key" och "t-value" av typer STRING
och INT
:
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr).as("key"),
from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr).as("value"))
För to_avro
kanske standardutdataschemat för Avro inte matchar schemat för målämnet i Schema Registry-tjänsten av följande skäl:
- Mappningen från Spark SQL-typ till Avro-schema är inte en-till-en. Se Typer som stöds för Spark SQL –> Avro-konvertering.
- Om det konverterade Avro-schemat är av rekordtyp är rekordnamnet
topLevelRecord
och det finns ingen namnområdesspecifikation som standardinställning.
Om standardutdataschemat för to_avro
matchar schemat för målämnet kan du göra följande:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Annars måste du ange schemat för målämnet i funktionen to_avro
:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Autentisera till ett externt Confluent-schemaregister
I Databricks Runtime 12.2 LTS och senare kan du autentisera till ett externt Confluent-schemaregister. Följande exempel visar hur du konfigurerar schemaregisteralternativen så att de innehåller autentiseringsuppgifter och API-nycklar.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("key"),
from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Använd truststore- och keystore-filer i Unity Catalog-volymer
I Databricks Runtime 14.3 LTS och senare kan du använda säkerhetsarkiv- och nyckelarkivfiler i Unity Catalog-volymer för att autentisera till ett Confluent-schemaregister. Uppdatera konfigurationen i föregående exempel med hjälp av följande syntax:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
Använd schemautvecklingsläge med from_avro
I Databricks Runtime 14.2 och senare kan du använda schemautvecklingsläget med from_avro
. Om du aktiverar schemautvecklingsläge genereras ett UnknownFieldException
när schemautvecklingen har upptäckts. Databricks rekommenderar att du konfigurerar jobb med schemautvecklingsläge för att automatiskt starta om vid aktivitetsfel. Se Produktionsöverväganden för strukturerad direktuppspelning.
Schemautveckling är användbart om du förväntar dig att schemat för dina källdata ska utvecklas över tid och mata in alla fält från datakällan. Om dina frågor redan uttryckligen anger vilka fält som ska frågas i datakällan ignoreras tillagda fält oavsett schemautveckling.
Använd alternativet avroSchemaEvolutionMode
för att aktivera schemautveckling. I följande tabell beskrivs alternativen för schemautvecklingsläge:
Alternativ | Funktionssätt |
---|---|
none |
Standard. Ignorerar schemautvecklingen och jobbet fortsätter. |
restart |
Genererar en UnknownFieldException vid identifiering av schemautveckling. Kräver en omstart av jobbet. |
Kommentar
Du kan ändra den här konfigurationen mellan direktuppspelningsjobb och återanvända samma kontrollpunkt. Om du inaktiverar schemautvecklingen kan det resultera i borttagna kolumner.
Konfigurera parsningsläget
Du kan konfigurera parsningsläget för att avgöra om du vill misslyckas eller generera null-poster när schemautvecklingsläget är inaktiverat och schemat utvecklas på ett icke-bakåtkompatibelt sätt. Med standardinställningar misslyckas from_avro
när det observerar inkompatibla schemaändringar.
Använd alternativet mode
för att ange parsningsläge. I följande tabell beskrivs alternativet för parsningsläge:
Alternativ | Funktionssätt |
---|---|
FAILFAST |
Standard. Ett parsningsfel genererar en SparkException med en errorClass av MALFORMED_AVRO_MESSAGE . |
PERMISSIVE |
Ett parsningsfel ignoreras och en nullpost genereras. |
Kommentar
När schemaevolution är aktiverad utlöser FAILFAST
endast undantag om en post är korrupt.
Exempel med schemautveckling och inställning av parsningsläge
I följande exempel visas hur du aktiverar schemautveckling och anger FAILFAST
parsningsläge med ett Confluent-schemaregister:
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
data = $"key",
subject = "t-key",
schemaRegistryAddress = schemaRegistryAddr,
options = schemaRegistryOptions.asJava).as("key"))
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)