Läs- och skrivprotokollbuffertar
Azure Databricks har inbyggt stöd för serialisering och deserialisering mellan Apache Spark-structs och protokollbuffertar (protobuf). Protobuf-stöd implementeras som en Apache Spark DataFrame-transformerare och kan användas med strukturerad direktuppspelning eller för batchåtgärder.
Så här deserialiserar och serialiserar du protokollbuffertar
I Databricks Runtime 12.2 LTS och senare kan du använda from_protobuf
och to_protobuf
funktioner för att serialisera och deserialisera data. Protobuf-serialisering används ofta i strömningsarbetsbelastningar.
Den grundläggande syntaxen för protobuf-funktioner liknar läs- och skrivfunktioner. Du måste importera dessa funktioner innan du kan använda dem.
from_protobuf
kastar en binär data column till en struktur och to_protobuf
kastar en struktur column till binär data. Du måste ange antingen ett schema register som anges med argumentet options
eller en beskrivande fil som identifieras av argumentet descFilePath
.
Python
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
Scala
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
Följande exempel illustrerar bearbetning av binära protobuf-poster med from_protobuf()
och konvertering av Spark SQL-struct till binär protobuf med to_protobuf()
.
Använd protobuf med Confluent Schema Registry
Azure Databricks stöder användning av Confluent Schema Registry för att definiera Protobuf.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
input_df
.select(
from_protobuf("proto_bytes", options = schema_registry_options)
.alias("proto_event")
)
)
# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
proto_events_df
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf("event", options = schema_registry_options)
.alias("proto_bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf($"event", options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
Autentisera till ett externt Confluent-Schema-register
Om du vill autentisera till ett externt Confluent-Schema-register update dina schema registeralternativ för att inkludera autentisering credentials- och API-nycklar.
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
Använd säkerhetsarkiv- och nyckelarkivfiler i Unity Catalogvolumes
I Databricks Runtime 14.3 LTS och senare kan du använda säkerhetsarkiv- och nyckelarkivfiler i Unity Catalogvolumes för att autentisera till ett Confluent-Schema-register. Update dina schema registerinställningar enligt följande exempel:
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
Använda Protobuf med en beskrivningsfil
Du kan också referera till en protobuf-beskrivningsfil som är tillgänglig för ditt beräkningskluster. Kontrollera att du har rätt behörighet att läsa filen, beroende på var den finns.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
descriptor_file = "/path/to/proto_descriptor.desc"
proto_events_df = (
input_df.select(
from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
)
)
proto_binary_df = (
proto_events_df
.select(
to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
val descriptorFile = "/path/to/proto_descriptor.desc"
val protoEventsDF = inputDF
.select(
from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
)
val protoBytesDF = protoEventsDF
.select(
to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
)
Alternativ som stöds i Protobuf-funktioner
Följande alternativ stöds i Protobuf-funktioner.
-
läge: Avgör hur fel vid deserialisering av Protobuf-poster hanteras. Felen kan orsakas av olika typer av felaktiga poster, inklusive ett matchningsfel mellan postens faktiska schema och den förväntade schema som anges i
from_protobuf()
.-
Values:
-
FAILFAST
(standard): Ett fel utlöses när en felaktig post påträffas och aktiviteten misslyckas. -
PERMISSIVE
: En NULL returneras för felaktiga poster. Använd det här alternativet noggrant eftersom det kan leda till att många poster tappas. Detta är användbart när en liten del av posterna i källan är felaktiga.
-
-
Values:
-
recursive.fields.max.depth: Lägger till stöd för rekursiva fält. Spark SQL-scheman stöder inte rekursiva fält. När det här alternativet inte har angetts tillåts inte rekursiva fält. För att stödja rekursiva fält i Protobufs måste de expandera till ett angivet djup.
Values:
-1 (standard): Rekursiva fält tillåts inte.
0: Rekursiva fält tas bort.
1: Tillåter en enda rekursionsnivå.
[2–10]: Ange ett tröskelvärde för multipel rekursion, upp till 10 nivåer.
Om du anger ett värde till större än 0 kan rekursiva fält expanderas de kapslade fälten till det konfigurerade djupet. Values större än 10 tillåts inte för att undvika att oavsiktligt skapa mycket stora scheman. Om ett Protobuf-meddelande har ett djup utöver det konfigurerade limit, trunkeras den returnerade Spark-strukturen efter rekursionen limit.
Exempel: Överväg en Protobuf med följande rekursiva fält:
message Person { string name = 1; Person friend = 2; }
Följande visar slutet schema med olika values för den här inställningen:
- Alternativ set till 1:
STRUCT<name: STRING>
- Alternativ set till 2:
STRUCT<name STRING, friend: STRUCT<name: STRING>>
- Alternativ set till 3:
STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
- Alternativ set till 1:
-
convert.any.fields.to.json: Med det här alternativet kan du konvertera Protobuf Alla fält till JSON. Den här funktionen bör aktiveras noggrant. JSON-konvertering och -bearbetning är ineffektiva. Dessutom förlorar JSON-strängfältet Protobuf schema säkerhet, vilket gör nedströmsbearbetningen benägen för fel.
Values:
- Falskt (standard): Vid körning kan sådana jokerteckenfält innehålla godtyckliga Protobuf-meddelanden som binära data. Som standard hanteras sådana fält som ett vanligt Protobuf-meddelande. Den har två fält med schema
(STRUCT<type_url: STRING, value: BINARY>)
. Som standard tolkas inte det binäravalue
fältet på något sätt. Men binära data kanske inte är praktiska i praktiken för att fungera i vissa program. - Sant: Om du ställer in det här värdet på Sant kan du konvertera
Any
fält till JSON-strängar vid körning. Med det här alternativet parsas binärfilen och Protobuf-meddelandet deserialiseras till en JSON-sträng.
- Falskt (standard): Vid körning kan sådana jokerteckenfält innehålla godtyckliga Protobuf-meddelanden som binära data. Som standard hanteras sådana fält som ett vanligt Protobuf-meddelande. Den har två fält med schema
Exempel: Överväg två Protobuf-typer som definierats på följande sätt:
message ProtoWithAny { string event_name = 1; google.protobuf.Any details = 2; } message Person { string name = 1; int32 id = 2; }
Med det här alternativet aktiverat är schema för
from_protobuf("col", messageName ="ProtoWithAny")
:STRUCT<event_name: STRING, details: STRING>
.Vid körning, om
details
fältet innehållerPerson
Protobuf-meddelande, ser det returnerade värdet ut så här:('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
.Krav:
- Definitionerna för alla möjliga Protobuf-typer som används i
Any
fält bör vara tillgängliga i protobuf-beskrivningsfilen som skickas tillfrom_protobuf()
. - Om
Any
Protobuf inte hittas resulterar det i ett fel för posten. - Den här funktionen stöds för närvarande inte med schema-registry.
- Definitionerna för alla möjliga Protobuf-typer som används i
-
emit.default.values: Aktiverar renderingsfält med noll values när Protobuf deserialiseras till en Spark-struktur. Det här alternativet bör användas sparsamt. Det är vanligtvis inte tillrådligt att vara beroende av sådana finare skillnader i semantik.
Values
- Falskt (standard): När ett fält är tomt i den serialiserade Protobuf är det resulterande fältet i Spark-structen som standard null. Det är enklare att inte aktivera det här alternativet och behandla
null
som standardvärde. - Sant: När det här alternativet är aktiverat fylls sådana fält med motsvarande standard values.
- Falskt (standard): När ett fält är tomt i den serialiserade Protobuf är det resulterande fältet i Spark-structen som standard null. Det är enklare att inte aktivera det här alternativet och behandla
Exempel: Tänk på följande Protobuf med Protobuf konstruerad som
Person(age=0, middle_name="")
:syntax = "proto3"; message Person { string name = 1; int64 age = 2; optional string middle_name = 3; optional int64 salary = 4; }
- Med det här alternativet set till False blir Spark-strukturen efter att anropa
from_protobuf()
alla null-värden:{"name": null, "age": null, "middle_name": "", "salary": null}
. Även om två fält (age
ochmiddle_name
) hade valuesset, inkluderar Protobuf dem inte i trådformat eftersom de är standard values. - Med det här alternativet set till Sant skulle Spark-structen efter att ha anropat
from_protobuf()
vara:{"name": "", "age": 0, "middle_name": "", "salary": null}
. Fältetsalary
förblir null eftersom det uttryckligen deklarerasoptional
och inte set i indataposten.
- Med det här alternativet set till False blir Spark-strukturen efter att anropa
-
enums.as.ints: När det är aktiverat återges uppräkningsfält i Protobuf som heltalsfält i Spark.
Values
- False (standard)
- Sant: När det är aktiverat återges uppräkningsfält i Protobuf som heltalsfält i Spark.
Exempel: Överväg följande Protobuf:
syntax = "proto3"; message Person { enum Job { NONE = 0; ENGINEER = 1; DOCTOR = 2; NURSE = 3; } Job job = 1; }
Ges ett Protobuf-meddelande som
Person(job = ENGINEER)
:- Med det här alternativet inaktiverat skulle motsvarande Spark-struct vara
{"job": "ENGINEER"}
. - Med det här alternativet aktiverat skulle motsvarande Spark-struct vara
{"job": 1}
.
Observera att schema för dessa fält skiljer sig åt i varje enskilt fall (heltal i stället för standardsträng). En sådan ändring kan påverka schema för nedströms tables.
- Med det här alternativet inaktiverat skulle motsvarande Spark-struct vara
Schema registerinställningar
Följande schema registeralternativ är relevanta när du använder schema register med Protobuf-funktioner.
-
schema.registry.subject
- Obligatoriskt
- Anger ämne för schema i Schema Registry, till exempel "klienthändelse"
-
schema.registry.address
- Obligatoriskt
- URL för schema register, till exempel
https://schema-registry.example.com:8081
-
schema.registry.protobuf.name
- Valfritt
- Standard:
<NONE>
. - En schema-registerpost för ett ämne kan innehålla flera Protobuf-definitioner, precis som en enda
proto
fil. När det här alternativet inte anges används den första Protobuf för schema. Ange namnet på Protobuf-meddelandet när det inte är det första i posten. Överväg till exempel en post med två Protobuf-definitioner: "Person" och "Plats" i den ordningen. Om strömmen motsvarar "Plats" i stället för "Person" set det här alternativet till "Plats" (eller dess fullständiga namn, inklusive paketet "com.example.protos.Location").
-
schema.registry.schema.evolution.mode
- Standard: "restart".
- Lägen som stöds:
- "starta om"
- "ingen"
- Det här alternativet anger schema-evolution-läge för
from_protobuf()
. I början av en fråga registrerar Spark den senaste schema-id för det angivna ämnet. Detta avgör schema förfrom_protobuf()
. En ny schema kan publiceras i schema-registret efter att sökningen har påbörjats. När en nyare schema-id visas i en inkommande post, indikerar det en ändring av schema. Det här alternativet avgör hur en sådan ändring av schema hanteras:-
starta om (standard): Utlöser en
UnknownFieldException
när ett nyare schema-id uppmärksammas. Frågan avslutas. Databricks rekommenderar att du konfigurerar jobb för att starta om när ett frågefel inträffar för att implementera schema-ändringar. - inga: Schema-id-ändringar ignoreras. Posterna med nyare schema-id tolkas med samma schema som observerades i början av förfrågan. Nyare Protobuf-definitioner förväntas vara bakåtkompatibla och nya fält ignoreras.
-
starta om (standard): Utlöser en
-
sammansmält.schema.register.
<schema-registy-client-option>
- Valfritt
-
Schema-registry ansluter till Confluent schema-registry med hjälp av Confluent Schema Registry-klienten. Alla konfigurationsalternativ som stöds av klienten kan anges med prefixet "confluent.schema.registry". Följande två inställningar anger till exempel "USER_INFO" autentisering credentials:
- confluent.schema.registry.basic.auth.credentials.source: "USER_INFO"
- "confluent.schema.registry.basic.auth.user.info": "
<KEY>
:<SECRET>
"