Buffer del protocollo di lettura e scrittura
Azure Databricks offre supporto nativo per la serializzazione e la deserializzazione tra gli struct apache Spark e i buffer di protocollo (protobuf). Il supporto protobuf viene implementato come trasformatore di dataframe Apache Spark e può essere usato con Structured Streaming o per le operazioni batch.
Come deserializzare e serializzare i buffer dei protocolli
In Databricks Runtime 12.2 LTS e versioni successive è possibile usare from_protobuf
funzioni e to_protobuf
per serializzare e deserializzare i dati. La serializzazione protobuf viene comunemente usata nei carichi di lavoro di streaming.
La sintassi di base per le funzioni protobuf è simile per le funzioni di lettura e scrittura. È necessario importare queste funzioni prima di usarle.
from_protobuf
effettua il cast di una colonna binaria in una struct e to_protobuf
effettua il cast di una colonna struct in binario. È necessario fornire un registro schemi specificato con l'argomento options
o un file descrittore identificato dall'argomento descFilePath
.
Python
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
Scala
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
Gli esempi seguenti illustrano l'elaborazione di record protobuf binari con from_protobuf()
e la conversione dello struct Spark SQL in protobuf binario con to_protobuf()
.
Usare protobuf con Registro degli Schemi Confluent
Azure Databricks supporta l'uso del Registro schemi Confluent per definire Protobuf.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
input_df
.select(
from_protobuf("proto_bytes", options = schema_registry_options)
.alias("proto_event")
)
)
# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
proto_events_df
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf("event", options = schema_registry_options)
.alias("proto_bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf($"event", options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
eseguire l'autenticazione in un registro dello schema confluente esterno
Per eseguire l'autenticazione in un Registro schemi Confluent esterno, aggiornare le opzioni del Registro di sistema dello schema in modo da includere le credenziali di autenticazione e le chiavi API.
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
Usare file truststore e keystore dentro i volumi di Unity Catalog
In Databricks Runtime 14.3 LTS e versioni successive è possibile usare i file truststore e keystore nei volumi del catalogo Unity per eseguire l'autenticazione in un Registro schemi Confluent. Aggiorna le tue opzioni del registro dello schema secondo l'esempio seguente:
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
Usare Protobuf con un file descrittore
È anche possibile fare riferimento a un file di descrittore protobuf disponibile per il cluster di calcolo. Assicurarsi di disporre delle autorizzazioni appropriate per leggere il file, a seconda del percorso.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
descriptor_file = "/path/to/proto_descriptor.desc"
proto_events_df = (
input_df.select(
from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
)
)
proto_binary_df = (
proto_events_df
.select(
to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
val descriptorFile = "/path/to/proto_descriptor.desc"
val protoEventsDF = inputDF
.select(
from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
)
val protoBytesDF = protoEventsDF
.select(
to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
)
Opzioni supportate nelle funzioni Protobuf
Le opzioni seguenti sono supportate nelle funzioni Protobuf.
-
mode: determina la modalità di gestione degli errori durante la deserializzazione dei record Protobuf. Gli errori possono essere causati da vari tipi di record in formato non valido, tra cui una mancata corrispondenza tra lo schema effettivo del record e lo schema previsto fornito in
from_protobuf()
.-
valori:
-
FAILFAST
(impostazione predefinita): viene generato un errore quando viene rilevato un record in formato non valido e l'attività non riesce. -
PERMISSIVE
: viene restituito un valore NULL per i record in formato non valido. Usare questa opzione con attenzione perché può comportare l'eliminazione di molti record. Ciò è utile quando una piccola frazione dei record nell'origine non è corretta.
-
-
valori:
-
recursive.fields.max.depth: aggiunge il supporto per i campi ricorsivi. Gli schemi SQL spark non supportano campi ricorsivi. Quando questa opzione non è specificata, i campi ricorsivi non sono consentiti. Per supportare i campi ricorsivi in Protobufs, è necessario espandersi fino a una profondità specificata.
Valori:
-1 (impostazione predefinita): i campi ricorsivi non sono consentiti.
0: i campi ricorsivi vengono eliminati.
1: consente un singolo livello di ricorsione.
[2-10]: specificare una soglia per più ricorsioni, fino a 10 livelli.
L'impostazione di un valore su maggiore di 0 consente campi ricorsivi espandendo i campi annidati fino alla profondità configurata. I valori maggiori di 10 non sono consentiti per evitare inavvertitamente la creazione di schemi molto grandi. Se un messaggio Protobuf ha una profondità maggiore del limite configurato, la struttura Spark restituita viene troncata al raggiungimento del limite di ricorsione.
Esempio: si consideri un protobuf con il campo ricorsivo seguente:
message Person { string name = 1; Person friend = 2; }
Di seguito è riportato l'elenco dello schema finale con valori diversi per questa impostazione:
- Opzione impostata su 1:
STRUCT<name: STRING>
- Opzione impostata su 2:
STRUCT<name STRING, friend: STRUCT<name: STRING>>
- Opzione impostata su 3:
STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
- Opzione impostata su 1:
-
convert.any.fields.to.json: questa opzione consente di convertire i campi Protobuf in JSON. Questa funzionalità deve essere abilitata attentamente. La conversione e l'elaborazione JSON sono inefficienti. Inoltre, il campo stringa JSON perde la sicurezza dello schema Protobuf rendendo l'elaborazione downstream soggetta a errori.
Valori:
- False (impostazione predefinita): in fase di esecuzione, tali campi con caratteri jolly possono contenere messaggi Protobuf arbitrari come dati binari. Per impostazione predefinita, tali campi vengono gestiti come un normale messaggio Protobuf. Ha due campi con schema
(STRUCT<type_url: STRING, value: BINARY>)
. Per impostazione predefinita, il campo binariovalue
non viene interpretato in alcun modo. Tuttavia, i dati binari potrebbero non essere utili in pratica per funzionare in alcune applicazioni. - True: l'impostazione di questo valore su True consente la conversione dei
Any
campi in stringhe JSON in fase di esecuzione. Con questa opzione, il file binario viene analizzato e il messaggio Protobuf viene deserializzato in una stringa JSON.
- False (impostazione predefinita): in fase di esecuzione, tali campi con caratteri jolly possono contenere messaggi Protobuf arbitrari come dati binari. Per impostazione predefinita, tali campi vengono gestiti come un normale messaggio Protobuf. Ha due campi con schema
Esempio: considerare due tipi Protobuf definiti come segue:
message ProtoWithAny { string event_name = 1; google.protobuf.Any details = 2; } message Person { string name = 1; int32 id = 2; }
Con questa opzione abilitata, lo schema per
from_protobuf("col", messageName ="ProtoWithAny")
sarà:STRUCT<event_name: STRING, details: STRING>
.In fase di esecuzione, se
details
il campo contienePerson
il messaggio Protobuf, il valore restituito sarà simile al seguente:('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
.Requisiti:
- Le definizioni per tutti i possibili tipi Protobuf usati nei
Any
campi devono essere disponibili nel file descrittore Protobuf passato afrom_protobuf()
. - Se
Any
Protobuf non viene trovato, verrà generato un errore per tale record. - Questa funzionalità attualmente non è supportata da schema-registry.
- Le definizioni per tutti i possibili tipi Protobuf usati nei
-
emit.default.values: abilita il rendering dei campi con valori zero durante la deserializzazione di Protobuf in uno struct Spark. Questa opzione deve essere usata con moderazione. In genere non è consigliabile dipendere da tali differenze più sottili nella semantica.
valori
- False (impostazione predefinita): quando un campo è vuoto nel Protobuf serializzato, il campo risultante nello struct Spark è null per impostazione predefinita. È più semplice non abilitare questa opzione e considerare
null
come valore predefinito. - True: quando questa opzione è abilitata, tali campi vengono compilati con i valori predefiniti corrispondenti.
- False (impostazione predefinita): quando un campo è vuoto nel Protobuf serializzato, il campo risultante nello struct Spark è null per impostazione predefinita. È più semplice non abilitare questa opzione e considerare
Esempio: si consideri il protobuf seguente con il Protobuf costruito come
Person(age=0, middle_name="")
:syntax = "proto3"; message Person { string name = 1; int64 age = 2; optional string middle_name = 3; optional int64 salary = 4; }
- Con questa opzione impostata su False, la struttura Spark dopo aver chiamato
from_protobuf()
sarà tutta nulla:{"name": null, "age": null, "middle_name": "", "salary": null}
. Anche se due campi (age
emiddle_name
) avevano valori impostati, Protobuf non li include in formato wire poiché sono valori predefiniti. - Con questa opzione impostata su True, lo struct Spark dopo aver chiamato
from_protobuf()
sarà:{"name": "", "age": 0, "middle_name": "", "salary": null}
. Il camposalary
rimane Null perché viene dichiarato in modo esplicitooptional
e non è impostato nel record di input.
- Con questa opzione impostata su False, la struttura Spark dopo aver chiamato
-
enums.as.ints: se abilitata, i campi enumerazione in Protobuf vengono visualizzati come campi integer in Spark.
valori
- False (impostazione predefinita)
- True: se abilitata, il rendering dei campi enumerazione in Protobuf viene eseguito come campi integer in Spark.
Esempio: Si consideri il protobuf seguente:
syntax = "proto3"; message Person { enum Job { NONE = 0; ENGINEER = 1; DOCTOR = 2; NURSE = 3; } Job job = 1; }
Dato un messaggio Protobuf come
Person(job = ENGINEER)
:- Con questa opzione disabilitata, lo struct Spark corrispondente sarà
{"job": "ENGINEER"}
. - Con questa opzione abilitata, lo struct Spark corrispondente sarà
{"job": 1}
.
Si noti che lo schema per questi campi è diverso in ogni caso (integer anziché stringa predefinita). Tale modifica può influire sullo schema delle tabelle downstream.
- Con questa opzione disabilitata, lo struct Spark corrispondente sarà
Opzioni di Schema Registry
Le opzioni del registro degli schemi seguenti sono pertinenti quando si usa il registro degli schemi con le funzioni Protobuf.
-
schema.registry.subject
- Richiesto
- Specifica l'oggetto per lo schema nel Registro schemi, ad esempio "client-event"
-
schema.registry.address
- Richiesto
- URL del registro degli schemi, ad esempio
https://schema-registry.example.com:8081
-
schema.registry.protobuf.name
- Facoltativo
- Impostazione predefinita:
<NONE>
. - Una registrazione del registro degli schemi per un soggetto può contenere più definizioni Protobuf, proprio come un singolo file
proto
. Quando questa opzione non viene specificata, il primo Protobuf viene usato per lo schema. Specificare il nome del messaggio Protobuf quando non è il primo nella voce. Si consideri, ad esempio, una voce con due definizioni Protobuf: "Person" e "Location" in tale ordine. Se il flusso corrisponde a "Location" invece che a "Person", imposta questa opzione su "Location" (o sul suo nome completo, incluso il pacchetto "com.example.protos.Location").
-
schema.registry.schema.evolution.mode
- Impostazione predefinita: "restart".
- Modalità supportate:
- "restart"
- "nessuno"
- Questa opzione imposta la modalità di evoluzione dello schema per
from_protobuf()
. All'inizio di una query, Spark registra l'ID dello schema più recente per l'oggetto specificato. Questo determina lo schema perfrom_protobuf()
. Un nuovo schema potrebbe essere pubblicato nel registro degli schemi dopo l'avvio della query. Quando un ID schema più recente viene notato in un record in ingresso, indica una modifica allo schema. Questa opzione determina come viene gestita tale modifica allo schema:-
riavviare (impostazione predefinita): attiva un
UnknownFieldException
quando viene notato un ID dello schema più recente. Questa operazione termina la query. Databricks consiglia di configurare i processi da riavviare in caso di errore di query per recepire le modifiche allo schema. - nessun: le modifiche dell'ID dello schema vengono ignorate. I record con ID schema più recente vengono analizzati con lo stesso schema osservato all'inizio della query. Le definizioni Protobuf più recenti devono essere compatibili con le versioni precedenti e i nuovi campi vengono ignorati.
-
riavviare (impostazione predefinita): attiva un
-
confluent.schema.registry.
<schema-registy-client-option>
- Facoltativo
- Il registro degli schemi si connette al registro degli schemi Confluent utilizzando il client del registro degli schemi Confluent
. Qualsiasi opzione di configurazione supportata dal client può essere specificata con il prefisso "confluent.schema.registry". Ad esempio, le due impostazioni seguenti forniscono le credenziali di autenticazione "USER_INFO": - "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO'
- "confluent.schema.registry.basic.auth.user.info": "
<KEY>
:<SECRET>
"