Lees- en schrijfbewerkingen voor het streamen van Avro-gegevens
Apache Avro is een veelgebruikt systeem voor gegevensserialisatie in de streamingwereld. Een typische oplossing is het plaatsen van gegevens in Avro-indeling in Apache Kafka, metagegevens in Confluent Schema Registry en vervolgens query's uitvoeren met een streamingframework dat verbinding maakt met zowel Kafka als Schema Registry.
Azure Databricks ondersteunt de en to_avro
functies voor het from_avro
bouwen van streamingpijplijnen met Avro-gegevens in Kafka en metagegevens in Schema Registry. De functie to_avro
codeert een kolom als binair bestand in Avro-indeling en from_avro
decodeert avro binaire gegevens in een kolom. Beide functies transformeren de ene kolom naar een andere kolom en het SQL-gegevenstype voor invoer/uitvoer kan een complex type of een primitief type zijn.
Notitie
De from_avro
en to_avro
functies:
- Zijn beschikbaar in Python, Scala en Java.
- Kan worden doorgegeven aan SQL-functies in zowel batch- als streamingquery's.
Zie ook de Avro-bestandsgegevensbron.
Handmatig opgegeven schemavoorbeeld
Net als from_json en to_json kunt u en to_avro
met elke binaire kolom gebruikenfrom_avro
. U kunt het Avro-schema handmatig opgeven, zoals in het volgende voorbeeld:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
jsonFormatSchema-voorbeeld
U kunt ook een schema opgeven als een JSON-tekenreeks. Als dit bijvoorbeeld het volgende is:/tmp/user.avsc
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
U kunt een JSON-tekenreeks maken:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Gebruik vervolgens het schema in from_avro
:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Voorbeeld met schemaregister
Als uw cluster een Schema Registry-service heeft, from_avro
kunt u ermee werken, zodat u het Avro-schema niet handmatig hoeft op te geven.
In het volgende voorbeeld ziet u hoe u een Kafka-onderwerp 't' leest, ervan uitgaande dat de sleutel en waarde al zijn geregistreerd in het schemaregister als onderwerp 't-key' en 't-waarde' van typen STRING
en INT
:
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr).as("value"))
Om to_avro
de volgende redenen komt het standaarduitvoer-Avro-schema mogelijk niet overeen met het schema van het doelonderwerp in de Schema Registry-service:
- De toewijzing van het Spark SQL-type aan het Avro-schema is niet een-op-een. Zie ondersteunde typen voor Spark SQL -> Avro-conversie.
- Als het geconverteerde avro-uitvoerschema van het recordtype is, is
topLevelRecord
de recordnaam en is er standaard geen naamruimte.
Als het standaarduitvoerschema to_avro
overeenkomt met het schema van het doelonderwerp, kunt u het volgende doen:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Anders moet u het schema van het doelonderwerp opgeven in de to_avro
functie:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Verifiëren bij een extern Confluent-schemaregister
In Databricks Runtime 12.2 LTS en hoger kunt u zich verifiëren bij een extern Confluent-schemaregister. In de volgende voorbeelden ziet u hoe u de schemaregisteropties configureert om verificatiereferenties en API-sleutels op te nemen.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
TrustStore- en keystore-bestanden gebruiken in Unity Catalog-volumes
In Databricks Runtime 14.3 LTS en hoger kunt u truststore- en sleutelopslagbestanden in Unity Catalog-volumes gebruiken om te verifiëren bij een Confluent-schemaregister. Werk de configuratie in het vorige voorbeeld bij met behulp van de volgende syntaxis:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
De evolutiemodus van het schema gebruiken met from_avro
In Databricks Runtime 14.2 en hoger kunt u de modus schemaontwikkeling gebruiken met from_avro
. Als u de modus voor schemaontwikkeling inschakelt, wordt de taak geactiveerd UnknownFieldException
nadat de ontwikkeling van het schema is gedetecteerd. Databricks raadt u aan taken te configureren met de modus schemaontwikkeling om automatisch opnieuw op te starten bij taakfouten. Zie Overwegingen voor productie voor gestructureerd streamen.
Schemaontwikkeling is handig als u verwacht dat het schema van uw brongegevens zich in de loop van de tijd ontwikkelt en alle velden uit uw gegevensbron opneemt. Als uw query's al expliciet aangeven welke velden moeten worden opgevraagd in uw gegevensbron, worden toegevoegde velden genegeerd, ongeacht de ontwikkeling van het schema.
Gebruik de avroSchemaEvolutionMode
optie om schemaontwikkeling in te schakelen. In de volgende tabel worden de opties voor de evolutiemodus van het schema beschreven:
Optie | Gedrag |
---|---|
none |
Standaard. Negeert de ontwikkeling van het schema en de taak wordt voortgezet. |
restart |
Genereert een UnknownFieldException bij het detecteren van de evolutie van het schema. Vereist opnieuw opstarten van een taak. |
Notitie
U kunt deze configuratie tussen streamingtaken wijzigen en hetzelfde controlepunt opnieuw gebruiken. Het uitschakelen van schemaontwikkeling kan leiden tot verwijderde kolommen.
De parseringsmodus configureren
U kunt de parseringsmodus configureren om te bepalen of u null-records wilt mislukken of verzenden wanneer de modus voor schemaontwikkeling is uitgeschakeld en het schema zich op een niet-achterwaartse compatibele manier ontwikkelt. Met standaardinstellingen from_avro
mislukt het wanneer er incompatibele schemawijzigingen worden waargenomen.
Gebruik de optie om de mode
parsemodus op te geven. In de volgende tabel wordt de optie voor de parseringsmodus beschreven:
Optie | Gedrag |
---|---|
FAILFAST |
Standaard. Een parseringsfout genereert een SparkException met een errorClass van MALFORMED_AVRO_MESSAGE . |
PERMISSIVE |
Er wordt een parseringsfout genegeerd en er wordt een null-record verzonden. |
Notitie
Als schemaontwikkeling is ingeschakeld, FAILFAST
worden alleen uitzonderingen veroorzaakt als een record beschadigd is.
Voorbeeld van het ontwikkelen van schema's en het instellen van de parseringsmodus
In het volgende voorbeeld ziet u hoe u de ontwikkeling van schema's inschakelt en de parsemodus opgeeft FAILFAST
met een Confluent Schema Registry:
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
$"key",
"t-key",
schemaRegistryAddr,
schemaRegistryOptions.asJava).as("key"))
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)