Leggere e scrivere dati Avro in streaming
Apache Avro è un sistema di serializzazione dei dati comunemente usato nel mondo di streaming. Una soluzione tipica consiste nell'inserire dati in formato Avro in Apache Kafka, metadati nel Registro Schemi Confluente quindi eseguire query con un framework di streaming che si connette sia a Kafka che al Registro Schemi.
Azure Databricks supporta le funzioni from_avro
, to_avro
,e per creare pipeline di streaming con dati Avro in Kafka e metadati in Schema Registry. La funzione to_avro
codifica una colonna come binaria nel formato Avro e from_avro
decodifica il dato Avro binario in una colonna. Entrambe le funzioni trasformano una colonna in un'altra colonna e il tipo di dati SQL di input/output può essere un tipo complesso o un tipo primitivo.
Nota
Funzioni from_avro
e to_avro
:
- Disponibili in Phyton, Scala e Java.
- È possibile passare alle funzioni SQL sia in batch che in query di streaming.
Vedere anche Origine dati del file Avro.
Esempio di schema specificato manualmente
Analogamente a from_json e to_json, è possibile usare from_avro
e to_avro
con qualsiasi colonna binaria. È possibile specificare manualmente lo schema Avro, come nell'esempio seguente:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Esempio jsonFormatSchema
È anche possibile specificare uno schema come stringa JSON. Ad esempio, se /tmp/user.avsc
è:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
È possibile creare una stringa JSON:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Usare quindi lo schema in from_avro
:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Esempio con registro dello schema
Se il cluster dispone di un servizio Registro schemi, from_avro
può usarlo in modo che non sia necessario specificare manualmente lo schema Avro.
L'esempio seguente illustra la lettura di un topic Kafka "t", presupponendo che la chiave e il valore siano già registrati nello Schema Registry come soggetti "t-key" e "t-value" di tipi STRING
e INT
:
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr).as("key"),
from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr).as("value"))
Per to_avro
, lo schema Avro di output predefinito potrebbe non corrispondere allo schema del soggetto di destinazione nel servizio Registro Schemi per i motivi seguenti:
- Il mapping dal tipo Spark SQL allo schema Avro non è uno-a-uno. Vedere Tipi supportati per Spark SQL -> Conversione Avro.
- Se lo schema Avro di output convertito è di tipo record, il nome del record è
topLevelRecord
e non esiste alcun namespace per impostazione predefinita.
Se lo schema di output predefinito di to_avro
corrisponde allo schema dell'oggetto di destinazione, è possibile eseguire le operazioni seguenti:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
In caso contrario, è necessario specificare lo schema dell'oggetto di destinazione nella funzione to_avro
:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
eseguire l'autenticazione in un registro dello schema confluente esterno
In Databricks Runtime 12.2 LTS e versioni successive è possibile eseguire l'autenticazione in un registro dello schema confluente esterno. Gli esempi seguenti illustrano come configurare le opzioni del Registro di sistema dello schema in modo da includere le credenziali di autenticazione e le chiavi API.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("key"),
from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Usare i file truststore e keystore nei volumi di Unity Catalog
In Databricks Runtime 14.3 LTS e versioni successive è possibile utilizzare i file truststore e keystore nei volumi di Unity Catalog per autenticarsi con un Confluent Schema Registry. Aggiornare la configurazione nell'esempio precedente usando la sintassi seguente:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
Usare la modalità di evoluzione dello schema con from_avro
In Databricks Runtime 14.2 e versioni successive è possibile usare la modalità di evoluzione dello schema con from_avro
. L'abilitazione della modalità di evoluzione dello schema fa sì che il processo generi un UnknownFieldException
dopo aver rilevato l'evoluzione dello schema. Databricks consiglia di configurare i processi con la modalità di evoluzione dello schema per il riavvio automatico in caso di errore dell'attività. Vedere Considerazioni sulla produzione per Structured Streaming.
L'evoluzione dello schema è utile se prevedi che lo schema dei tuoi dati di origine si evolva nel tempo e includa tutti i campi dalla sorgente dati. Se le tue query specificano già in modo esplicito quali campi interrogare nell'origine dati, i campi aggiunti vengono ignorati indipendentemente dall'evoluzione dello schema.
Usare l'opzione avroSchemaEvolutionMode
per abilitare l'evoluzione dello schema. Nella tabella seguente vengono descritte le opzioni per la modalità di evoluzione dello schema:
Opzione | Comportamento |
---|---|
none |
Valore predefinito. Ignora l'evoluzione dello schema e il processo continua. |
restart |
Genera un UnknownFieldException durante il rilevamento dell'evoluzione dello schema. Richiede il riavvio. |
Nota
È possibile modificare questa configurazione tra processi di streaming e riutilizzare lo stesso checkpoint. La disabilitazione dell'evoluzione dello schema può comportare la cancellazione di colonne.
Configurare la modalità di analisi
È possibile configurare la modalità di analisi per determinare se si desidera interrompere o generare record Null quando la modalità di evoluzione dello schema è disabilitata e lo schema si evolve in modo non compatibile con le versioni precedenti. Con le impostazioni predefinite, from_avro
fallisce quando rileva modifiche dello schema incompatibili.
Usare l'opzione mode
per specificare la modalità di analisi. La tabella seguente descrive l'opzione per la modalità di analisi:
Opzione | Comportamento |
---|---|
FAILFAST |
Valore predefinito. Un errore di analisi genera un SparkException con un errorClass di MALFORMED_AVRO_MESSAGE . |
PERMISSIVE |
Un errore di analisi viene ignorato e viene generato un record Null. |
Nota
Con l'evoluzione dello schema abilitata, FAILFAST
genera eccezioni solo se un record è danneggiato.
Esempio di uso dell'evoluzione dello schema e dell'impostazione della modalità di analisi
L'esempio seguente illustra come abilitare l'evoluzione dello Schema e specificare la modalità di analisi FAILFAST
con un Registro Schemi Confluent.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
data = $"key",
subject = "t-key",
schemaRegistryAddress = schemaRegistryAddr,
options = schemaRegistryOptions.asJava).as("key"))
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)