Lees- en schrijfbewerkingen voor het streamen van Avro-gegevens
Apache Avro is een veelgebruikt systeem voor gegevensserialisatie in de streamingwereld. Een typische oplossing is het plaatsen van gegevens in Avro-indeling in Apache Kafka, metagegevens in Confluent Schema Registryen vervolgens query's uitvoeren met een streamingframework dat verbinding maakt met zowel Kafka als Schema Registry.
Azure Databricks ondersteunt de functies from_avro
en to_avro
voor het bouwen van streamingpijplijnen met Avro-gegevens in Kafka en metagegevens in Schema Registry. De functie to_avro
codeert een column als binair bestand in Avro-indeling en from_avro
decodeert binaire Avro-gegevens in een column. Beide functies transformeren één column naar een andere column, en het sql-gegevenstype invoer/uitvoer kan een complex type of een primitief type zijn.
Notitie
De from_avro
en to_avro
functies:
- Zijn beschikbaar in Python, Scala en Java.
- Kan worden doorgegeven aan SQL-functies in zowel batch- als streamingquery's.
Zie ook de Avro-bestandsgegevensbron.
Handmatig opgegeven schema voorbeeld
Net als bij from_json en to_jsonkunt u from_avro
en to_avro
gebruiken met elk binair column. U kunt de Avro-schema handmatig opgeven, zoals in het volgende voorbeeld:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
jsonFormatSchema-voorbeeld
U kunt ook een schema opgeven als een JSON-tekenreeks. Als dit bijvoorbeeld het volgende is:/tmp/user.avsc
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
U kunt een JSON-tekenreeks maken:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Gebruik vervolgens de schema in from_avro
:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Voorbeeld met Schema-register
Als uw cluster een Schema Registry-service heeft, kunt from_avro
ermee werken, zodat u de Avro-schema niet handmatig hoeft op te geven.
In het volgende voorbeeld ziet u hoe u een Kafka-onderwerp 't' leest, ervan uitgaande dat de sleutel en waarde al zijn geregistreerd in Schema Register als onderwerpen 't-sleutel' en 't-waarde' van typen STRING
en INT
:
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr).as("value"))
Voor to_avro
komt de standaarduitvoer Avro-schema mogelijk niet overeen met de schema van het doelobject in de Schema Registry-service om de volgende redenen:
- De toewijzing van het Spark SQL-type aan Avro-schema is niet een-op-een. Zie ondersteunde typen voor Spark SQL -> Avro-conversie.
- Als de geconverteerde output Avro schema van recordtype is, is de recordnaam
topLevelRecord
en is er standaard geen namespace.
Als de standaarduitvoer schema van to_avro
overeenkomt met de schema van het doelonderwerp, kunt u het volgende doen:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Anders moet u de schema van het doelonderwerp opgeven in de functie to_avro
:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Authenticeren bij een extern Confluent Schema Register
In Databricks Runtime 12.2 LTS en hoger kunt u zich verifiëren bij een extern Confluent-Schema-register. In de volgende voorbeelden ziet u hoe u uw schema registeropties configureert om verificatie-credentials en API-sleutels op te nemen.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
TrustStore- en keystore-bestanden gebruiken in Unity Catalogvolumes
In Databricks Runtime 14.3 LTS en hoger kunt u truststore- en sleutelopslagbestanden in Unity Catalogvolumes gebruiken om te verifiëren bij een Confluent Schema Registry. Update de configuratie in het vorige voorbeeld met behulp van de volgende syntaxis:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
De evolutiemodus van schema gebruiken met from_avro
In Databricks Runtime 14.2 en hoger kunt u schema evolutiemodus gebruiken met from_avro
. Het inschakelen van schema evolutiemodus zorgt ervoor dat de taak een UnknownFieldException
genereert nadat schema evolutie is gedetecteerd. Databricks raadt aan taken te configureren met schema evolutiemodus om automatisch opnieuw op te starten bij taakfouten. Zie Overwegingen voor productie voor gestructureerd streamen.
Schema evolutie is nuttig als u verwacht dat de schema van uw brongegevens zich in de loop van de tijd ontwikkelen en alle velden uit uw gegevensbron opnemen. Als uw query's al expliciet aangeven welke velden moeten worden opgevraagd in uw gegevensbron, worden toegevoegde velden genegeerd, ongeacht schema evolutie.
Gebruik de optie avroSchemaEvolutionMode
om schema evolutie mogelijk te maken. In de volgende table worden de opties voor schema evolutiemodus beschreven:
Optie | Gedrag |
---|---|
none |
Standaard. Negeert de evolutie van schema en de taak gaat verder. |
restart |
Genereert een UnknownFieldException bij het detecteren van een schema evolutie. Vereist opnieuw opstarten van een taak. |
Notitie
U kunt deze configuratie tussen streamingtaken wijzigen en hetzelfde controlepunt opnieuw gebruiken. Als u schema evolutie uitschakelt, kan dit leiden tot verwijderde columns.
De parseringsmodus configureren
U kunt de parsemodus configureren om te bepalen of u wilt dat het proces mislukt of dat er null records worden uitgegeven wanneer de schema-evolutiemodus is uitgeschakeld en de schema zich op een niet achterwaarts compatibele manier ontwikkelt. Met standaardinstellingen mislukt from_avro
wanneer er incompatibele schema wijzigingen worden waargenomen.
Gebruik de optie om de mode
parsemodus op te geven. In de volgende table wordt de optie voor de parseringsmodus beschreven:
Optie | Gedrag |
---|---|
FAILFAST |
Standaard. Een parseringsfout genereert een SparkException met een errorClass van MALFORMED_AVRO_MESSAGE . |
PERMISSIVE |
Er wordt een parseringsfout genegeerd en er wordt een null-record verzonden. |
Notitie
Als schema evolutie is ingeschakeld, genereert FAILFAST
alleen uitzonderingen als een record beschadigd is.
Voorbeeld van het gebruik van schema evolutie en het instellen van de parseringsmodus
In het volgende voorbeeld wordt getoond hoe u schema evolutie inschakelt en de FAILFAST
parse modus opgeeft met een Confluent Schema Registry:
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
$"key",
"t-key",
schemaRegistryAddr,
schemaRegistryOptions.asJava).as("key"))
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)