Ler e gravar buffers de protocolo
O Azure Databricks fornece suporte nativo para serialização e desserialização entre structs do Apache Spark e buffers de protocolo (protobuf). O suporte ao protobuf é implementado como um transformador de DataFrame do Apache Spark e pode ser usado com Streaming Estruturado ou para operações em lote.
Como desserializar e serializar buffers de protocolo
No Databricks Runtime 12.2 LTS e posteriores, você pode usar funções from_protobuf
e to_protobuf
para serializar e desserializar dados. A serialização protobuf é comumente usada em cargas de trabalho de streaming.
A sintaxe básica para funções protobuf é semelhante para funções de leitura e gravação. Você deve importar essas funções antes de usar.
O from_protobuf
converte uma coluna binária em um struct e o to_protobuf
converte uma coluna struct em binário. Você deve fornecer um registro de esquema especificado com o argumento options
ou um arquivo descritor identificado pelo argumento descFilePath
.
Python
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
Scala
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
Os exemplos a seguir ilustram o processamento de registros protobuf binários com from_protobuf()
e a conversão do struct do Spark SQL em protobuf binário com to_protobuf()
.
Usar protobuf com o Registro de Esquema Confluent
O Azure Databricks dá suporte ao uso do Registro de Esquema Confluent para definir o Protobuf.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
input_df
.select(
from_protobuf("proto_bytes", options = schema_registry_options)
.alias("proto_event")
)
)
# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
proto_events_df
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf("event", options = schema_registry_options)
.alias("proto_bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf($"event", options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
Autenticar em um Registro de Esquema Confluent externo
Para se autenticar em um Registro de Esquema Confluent externo, atualize as opções do registro de esquema para incluir credenciais de autenticação e chaves de API.
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
Usar arquivos truststore e de repositório de chaves em volumes do Catálogo do Unity
No Databricks Runtime 14.3 LTS e posteriores, você pode usar arquivos truststore e de repositório de chaves em volumes do Catálogo do Unity para autenticar em um registro de esquema Confluent. Atualize as opções do registro de esquema de acordo com o seguinte exemplo:
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
Usar o protobuf com um arquivo descritor
Você também pode referenciar um arquivo descritor protobuf que está disponível para o cluster de cálculo. Verifique se você tem permissões adequadas para ler o arquivo, dependendo de sua localização.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
descriptor_file = "/path/to/proto_descriptor.desc"
proto_events_df = (
input_df.select(
from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
)
)
proto_binary_df = (
proto_events_df
.select(
to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
val descriptorFile = "/path/to/proto_descriptor.desc"
val protoEventsDF = inputDF
.select(
from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
)
val protoBytesDF = protoEventsDF
.select(
to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
)
Opções com suporte em funções Protobuf
As opções a seguir têm suporte em funções Protobuf.
- modo: determina como são tradados os erros durante a desserialização dos registros Protobuf. Os erros podem ser causados por vários tipos de registros malformados, incluindo uma incompatibilidade entre o esquema real do registro e o esquema esperado fornecido em
from_protobuf()
.- Valores:
FAILFAST
(padrão): um erro é gerado quando um registro malformado é encontrado, e a tarefa falha.PERMISSIVE
: um NULL é retornado para registros malformados. Use essa opção com cuidado, pois ela pode resultar na remoção de muitos registros. Isso é útil quando uma pequena fração dos registros na origem está incorreta.
- Valores:
- recursive.fields.max.depth: adiciona suporte para campos recursivos. Os esquemas SQL do Spark não dão suporte a campos recursivos. Quando essa opção não é especificada, campos recursivos não são permitidos. Para dar suporte a campos recursivos em Protobufs, eles precisam estar expandindo para uma profundidade especificada.
Valores:
-1 (padrão): campos recursivos não são permitidos.
0: campos recursivos são descartados.
1: permite um único nível de recursão.
[2-10]: especifique um limite para várias recursões, até 10 níveis.
Definir um valor como maior que 0 permite campos recursivos expandindo os campos aninhados para a profundidade configurada. Valores maiores que 10 não são permitidos para evitar a criação inadvertida de esquemas muito grandes. Se uma mensagem Protobuf tiver profundidade além do limite configurado, o struct do Spark retornado será truncado após o limite de recursão.
Exemplo: considere um Protobuf com o seguinte campo recursivo:
message Person { string name = 1; Person friend = 2; }
O seguinte lista o esquema final com valores diferentes para esta configuração:
- Opção definida como 1:
STRUCT<name: STRING>
- Opção definida como 2:
STRUCT<name STRING, friend: STRUCT<name: STRING>>
- Opção definida como 3:
STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
- Opção definida como 1:
- convert.any.fields.to.json: essa opção permite a conversão os campos Any do Protobuf em JSON. Esse recurso deve ser habilitado com cuidado. Conversão e processamento JSON são ineficientes. Além disso, o campo de cadeia de caracteres JSON perde a segurança do esquema Protobuf, tornando o processamento downstream propenso a erros.
Valores:
- False (padrão): em runtime, esses campos curinga podem conter mensagens Protobuf arbitrárias como dados binários. Por padrão, esses campos são tratados como uma mensagem Protobuf normal. Ele tem dois campos com esquema
(STRUCT<type_url: STRING, value: BINARY>)
. Por padrão, o campovalue
binário não é interpretado de forma alguma. Mas os dados binários podem não ser convenientes na prática para funcionar em alguns aplicativos. - True: definir esse valor como True permite converter campos
Any
em cadeias de caracteres JSON em runtime. Com essa opção, o binário é analisado e a mensagem Protobuf é desserializada em uma cadeia de caracteres JSON.
- False (padrão): em runtime, esses campos curinga podem conter mensagens Protobuf arbitrárias como dados binários. Por padrão, esses campos são tratados como uma mensagem Protobuf normal. Ele tem dois campos com esquema
Exemplo: considere dois tipos Protobuf definidos da seguinte maneira:
message ProtoWithAny { string event_name = 1; google.protobuf.Any details = 2; } message Person { string name = 1; int32 id = 2; }
Com essa opção habilitada, o esquema para
from_protobuf("col", messageName ="ProtoWithAny")
seria:STRUCT<event_name: STRING, details: STRING>
.Em tempo de execução, se o campo
details
contiver a mensagem ProtobufPerson
, o valor retornado será semelhante a este:('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
.Requisitos:
- As definições para todos os tipos de Protobuf possíveis usados em campos
Any
devem estar disponíveis no arquivo de descritor Protobuf passado parafrom_protobuf()
. - Se Protobuf
Any
não for encontrado, isso resultará em um erro para esse registro. - No momento, não há suporte para esse recurso com schema-registry.
- As definições para todos os tipos de Protobuf possíveis usados em campos
- emit.default.values: habilita a renderização de campos com valores zero ao desserializar o Protobuf para um struct do Spark. Essa opção deve ser usada com moderação. Geralmente, não é aconselhável depender de diferenças tão pequenas na semântica.
Valores
- False (padrão): quando um campo está vazio no Protobuf serializado, o campo resultante no struct do Spark é nulo por padrão. É mais simples não habilitar essa opção e tratar
null
como o valor padrão. - True: quando essa opção está habilitada, esses campos são preenchidos com valores padrão correspondentes.
- False (padrão): quando um campo está vazio no Protobuf serializado, o campo resultante no struct do Spark é nulo por padrão. É mais simples não habilitar essa opção e tratar
Exemplo: considere o Protobuf a seguir com o Protobuf construído como
Person(age=0, middle_name="")
:syntax = "proto3"; message Person { string name = 1; int64 age = 2; optional string middle_name = 3; optional int64 salary = 4; }
- Com essa opção definida como False, o struct do Spark depois de chamar
from_protobuf()
seria nulo:{"name": null, "age": null, "middle_name": "", "salary": null}
. Embora dois campos (age
emiddle_name
) tenham valores definidos, o Protobuf não os inclui em formato de fio, pois são valores padrão. - Com essa opção definida como False, o struct do Spark depois de chamar
from_protobuf()
seria:{"name": "", "age": 0, "middle_name": "", "salary": null}
. O camposalary
permanece nulo, pois é declarado explicitamenteoptional
e não está definido no registro de entrada.
- Com essa opção definida como False, o struct do Spark depois de chamar
- enums.as.ints: quando habilitados, os campos de enumeração no Protobuf são renderizados como campos inteiros no Spark.
Valores
- False (padrão)
- True: quando habilitados, os campos de enumeração no Protobuf são renderizados como campos inteiros no Spark.
Exemplo: considere o seguinte Protobuf:
syntax = "proto3"; message Person { enum Job { NONE = 0; ENGINEER = 1; DOCTOR = 2; NURSE = 3; } Job job = 1; }
Em uma determinada mensagem Protobuf como
Person(job = ENGINEER)
:- Com essa opção desabilitada, o struct do Spark correspondente seria
{"job": "ENGINEER"}
. - Com essa opção desabilitada, o struct do Spark correspondente seria
{"job": 1}
.
Observe que o esquema desses campos é diferente em cada caso (inteiro em vez de cadeia de caracteres padrão). Essa alteração pode afetar o esquema de tabelas downstream.
- Com essa opção desabilitada, o struct do Spark correspondente seria
Opções do Registro de Esquema
As opções de registro de esquema a seguir são relevantes ao usar o registro de esquema com funções Protobuf.
- schema.registry.subject
- Obrigatório
- Especifica o assunto do esquema no Registro de Esquema, como “client-event”
- schema.registry.address
- Obrigatório
- URL para registro de esquema, como
https://schema-registry.example.com:8081
- schema.registry.protobuf.name
- Opcional
- Padrão:
<NONE>
. - Uma entrada de registro de esquema para um assunto pode conter várias definições do Protobuf, assim como um único arquivo
proto
. Quando essa opção não é especificada, o primeiro Protobuf é usado para o esquema. Especifique o nome da mensagem Protobuf quando ela não for a primeira na entrada. Por exemplo, considere uma entrada com duas definições do Protobuf: “Pessoa” e “Localização” nessa ordem. Se o fluxo corresponder a “Location” em vez “de Person”, defina essa opção como “Location” (ou seu nome completo, incluindo o pacote “com.example.protos.Location”).
- schema.registry.schema.evolution.mode
- Padrão: “restart”.
- Modos com suporte:
- “restart”
- “none”
- Essa opção define o modo de schema-evolution para
from_protobuf()
. No início de uma consulta, o Spark registra a ID de esquema mais recente para o assunto especificado. Isso determina o esquema parafrom_protobuf()
. Um novo esquema pode ser publicado no registro de esquema após o início da consulta. Quando uma ID de esquema mais recente é notada em um registro de entrada, ela indica uma alteração no esquema. Essa opção determina como essa alteração no esquema é tratada:- restart (padrão): dispara um
UnknownFieldException
quando uma ID de esquema mais recente é notada. Isso encerra a consulta. O Databricks recomenda a configuração de trabalhos para reiniciar em caso de falha de consulta para selecionar alterações de esquema. - none: as alterações de schema-id são ignoradas. Os registros com schema-id mais recente são analisados com o mesmo esquema que foi observado no início da consulta. Espera-se que as definições mais recentes do Protobuf sejam compatíveis com versões anteriores e novos campos sejam ignorados.
- restart (padrão): dispara um
- confluent.schema.registry.
<schema-registy-client-option>
- Opcional
- O squema-registry conecta-se ao schema-registry do Confluent usando o cliente do Registro de Esquema Confluent. Todas as opções de configuração compatíveis com o cliente podem ser especificadas com o prefixo “confluent.schema.registry”. Por exemplo, as duas configurações a seguir fornecem credenciais de autenticação “USER_INFO”:
- “confluent.schema.registry.basic.auth.credentials.source”: ‘USER_INFO’
- “confluent.schema.registry.basic.auth.user.info”: “
<KEY>
:<SECRET>
”