Vyrovnávací paměti protokolu pro čtení a zápis
Azure Databricks poskytuje nativní podporu serializace a deserializace mezi strukturami Apache Sparku a vyrovnávacími paměťmi protokolu (protobuf). Podpora Protobuf je implementována jako transformátor datového rámce Apache Spark a lze ji použít se strukturovaným streamováním nebo pro dávkové operace.
Jak deserializovat a serializovat vyrovnávací paměti protokolu
Ve verzi Databricks Runtime 12.2 LTS a vyšší můžete data serializovat a deserializovat pomocí from_protobuf
funkcí to_protobuf
. Serializace Protobuf se běžně používá v úlohách streamování.
Základní syntaxe funkcí protobuf je podobná pro funkce pro čtení a zápis. Před použitím je nutné tyto funkce importovat.
from_protobuf
přetypuje binární column na strukturu a to_protobuf
přetypuje strukturu column na binární. Je nutné zadat registr schema zadaný argumentem options
nebo soubor popisovače identifikovaný argumentem descFilePath
.
Python
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
Scala
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
Následující příklady ilustrují zpracování binárních záznamů protobuf pomocí from_protobuf()
a převod struktury Spark SQL na binární protobuf s to_protobuf()
.
Použití protobuf s registrem Schema Confluent
Azure Databricks podporuje k definování protobuf pomocí
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
input_df
.select(
from_protobuf("proto_bytes", options = schema_registry_options)
.alias("proto_event")
)
)
# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
proto_events_df
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf("event", options = schema_registry_options)
.alias("proto_bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf($"event", options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
Ověřte se u externího Confluent registru Schema
Pokud se chcete ověřit v externím registru confluent Schema, update možnosti registru schema, aby zahrnovaly ověřovací credentials a klíče rozhraní API.
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
Použití úložiště důvěryhodnosti a souborů úložiště klíčů v Unity Catalogvolumes
Ve službě Databricks Runtime 14.3 LTS a novější můžete v rámci prostředí Unity Catalogvolumes využít soubory truststore a keystore k autentizaci v registru Confluent Schema. Update možnosti registru schema podle následujícího příkladu:
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
Použití Protobuf se souborem popisovače
Můžete také odkazovat na soubor popisovače protobuf, který je dostupný pro váš výpočetní cluster. Ujistěte se, že máte správná oprávnění ke čtení souboru v závislosti na jeho umístění.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
descriptor_file = "/path/to/proto_descriptor.desc"
proto_events_df = (
input_df.select(
from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
)
)
proto_binary_df = (
proto_events_df
.select(
to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
val descriptorFile = "/path/to/proto_descriptor.desc"
val protoEventsDF = inputDF
.select(
from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
)
val protoBytesDF = protoEventsDF
.select(
to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
)
Podporované možnosti ve funkcích Protobuf
Funkce Protobuf podporují následující možnosti.
-
mode: Určuje, jak se zpracovávají chyby při deserializaci záznamů Protobuf. Chyby mohou být způsobeny různými typy poškozených záznamů, včetně neshody mezi skutečnými schema záznamu a očekávanými schema zadanými v
from_protobuf()
.-
Values:
-
FAILFAST
(výchozí): Při výskytu poškozeného záznamu dojde k chybě a úloha selže. -
PERMISSIVE
: Pro poškozené záznamy se vrátí hodnota NULL. Tuto možnost používejte pečlivě, protože může vést k zahození mnoha záznamů. To je užitečné v případě, že malá část záznamů ve zdroji není správná.
-
-
Values:
-
recursive.fields.max.depth: Přidá podporu rekurzivních polí. Schémata Spark SQL nepodporují rekurzivní pole. Pokud tato možnost není zadána, rekurzivní pole nejsou povolena. Aby bylo možné podporovat rekurzivní pole v Protobufs, je potřeba je rozšířit na zadanou hloubku.
Values:
-1 (výchozí): Rekurzivní pole nejsou povolená.
0: Rekurzivní pole se zahodí.
1: Umožňuje jednu úroveň rekurze.
[2–10]: Zadejte prahovou hodnotu pro více rekurzí až 10 úrovní.
Nastavení hodnoty větší než 0 umožňuje rekurzivní pole rozbalením vnořených polí na nakonfigurovanou hloubku. Values větší než 10 není povoleno, aby nedocházelo k neúmyslnému vytváření velmi velkých schémat. Pokud zpráva Protobuf dosáhne hloubky přesahující nakonfigurovaný limit, struktura Sparku je zkrácena po dokončení rekurze limit.
Příklad: Zvažte Protobuf s následujícím rekurzivním polem:
message Person { string name = 1; Person friend = 2; }
Následující seznam uvádí koncový schema s různými values pro toto nastavení:
- Možnost set na 1:
STRUCT<name: STRING>
- Možnost set na 2:
STRUCT<name STRING, friend: STRUCT<name: STRING>>
- Možnost set na 3:
STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
- Možnost set na 1:
-
convert.any.fields.to.json: Tato možnost umožňuje převádět pole Protobuf Any na JSON. Tuto funkci byste měli povolit pečlivě. Převod a zpracování JSON je neefektivní. Kromě toho řetězcové pole JSON ztratí bezpečnost Protobuf schema, což činí další zpracování náchylným k chybám.
Values:
- False (výchozí): Za běhu můžou taková pole se zástupnými cardy obsahovat libovolné zprávy Protobuf jako binární data. Ve výchozím nastavení se tato pole zpracovávají jako normální zpráva Protobuf. Má dvě pole s schema
(STRUCT<type_url: STRING, value: BINARY>)
. Ve výchozím nastavenívalue
binární pole není interpretováno žádným způsobem. Binární data ale nemusí být v praxi vhodná pro práci v některých aplikacích. - True: Nastavení této hodnoty na Hodnotu True umožňuje převádět
Any
pole na řetězce JSON za běhu. Při této možnosti se binární soubor analyzuje a zpráva Protobuf se deserializuje do řetězce JSON.
- False (výchozí): Za běhu můžou taková pole se zástupnými cardy obsahovat libovolné zprávy Protobuf jako binární data. Ve výchozím nastavení se tato pole zpracovávají jako normální zpráva Protobuf. Má dvě pole s schema
Příklad: Zvažte dva typy Protobuf definované takto:
message ProtoWithAny { string event_name = 1; google.protobuf.Any details = 2; } message Person { string name = 1; int32 id = 2; }
Pokud je tato možnost povolena, bude schema pro
from_protobuf("col", messageName ="ProtoWithAny")
:STRUCT<event_name: STRING, details: STRING>
.Pokud pole v době
details
běhu obsahujePerson
zprávu Protobuf, vrácená hodnota vypadá takto:('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
.Požadavky:
- Definice všech možných typů Protobuf, které se používají v
Any
polích, by měly být k dispozici v souboru popisovače Protobuf předané .from_protobuf()
- Pokud
Any
protobuf nebyl nalezen, dojde k chybě pro tento záznam. - Tato funkce se v současné době nepodporuje u schema-registry.
- Definice všech možných typů Protobuf, které se používají v
-
emit.default.values: Povolí zobrazování polí s nulovou values při deserializaci Protobuf na strukturu Spark. Tato možnost by se měla používat střídmě. Obvykle se nedoporučuje záviset na takových jemných rozdílech v sémantice.
Values
- False (výchozí): Pokud je pole v serializovaném protobuf prázdné, výsledné pole ve struktuře Sparku má ve výchozím nastavení hodnotu null. Tuto možnost nelze povolit a považovat
null
ji za výchozí hodnotu. - Pravda: Pokud je tato možnost povolena, jsou tato pole vyplněna odpovídající výchozí hodnotou values.
- False (výchozí): Pokud je pole v serializovaném protobuf prázdné, výsledné pole ve struktuře Sparku má ve výchozím nastavení hodnotu null. Tuto možnost nelze povolit a považovat
Příklad: Zvažte následující Protobuf s Protobuf vytvořený jako
Person(age=0, middle_name="")
:syntax = "proto3"; message Person { string name = 1; int64 age = 2; optional string middle_name = 3; optional int64 salary = 4; }
- Při nastavení možnosti set na False by struktura Sparku po volání
from_protobuf()
obsahovala pouze hodnoty null:{"name": null, "age": null, "middle_name": "", "salary": null}
. I když dvě pole (age
amiddle_name
) měla valuesset, Protobuf je v drátovém formátu neobsahuje, protože jsou výchozí values. - Při nastavení možnosti set na hodnotu True bude struktura Sparku po volání
from_protobuf()
vypadat takto:{"name": "", "age": 0, "middle_name": "", "salary": null}
. Polesalary
zůstává null, protože je explicitně deklarovánooptional
a není set ve vstupním záznamu.
- Při nastavení možnosti set na False by struktura Sparku po volání
-
enums.as.ints: Při povolení se pole výčtu v Protobuf vykreslují jako celočíselná pole ve Sparku.
Values
- False (výchozí)
- True: Pokud je tato možnost povolená, vykreslí se pole výčtu v Protobuf jako celočíselná pole ve Sparku.
Příklad: Zvažte následující protobuf:
syntax = "proto3"; message Person { enum Job { NONE = 0; ENGINEER = 1; DOCTOR = 2; NURSE = 3; } Job job = 1; }
Vzhledem k tomu, protobuf zpráva jako
Person(job = ENGINEER)
:- Pokud je tato možnost zakázaná, odpovídající struktura Sparku by byla
{"job": "ENGINEER"}
. - Pokud je tato možnost povolená, odpovídající struktura Sparku by byla
{"job": 1}
.
Všimněte si, že schema pro tato pole se v každém případě liší (celé číslo místo výchozího řetězce). Taková změna může ovlivnit schema následných tables.
- Pokud je tato možnost zakázaná, odpovídající struktura Sparku by byla
možnosti registru Schema
Při používání registru schema s funkcemi Protobuf jsou relevantní následující možnosti registru schema.
-
schema.registry.subject
- Požaduje se
- Určuje předmět schema v registru Schema, například "client-event".
-
schema.registry.address
- Požaduje se
- Adresa URL registru schema, například
https://schema-registry.example.com:8081
-
schema.registry.protobuf.name
- Volitelné
- Výchozí hodnota:
<NONE>
. - Položka registru schemapro předmět může obsahovat více definic Protobuf, stejně jako jeden soubor
proto
. Pokud tato možnost není zadána, použije se první Protobuf pro schema. Zadejte název zprávy Protobuf, pokud není první v položce. Představte si například položku se dvěma definicemi Protobuf: Person (Osoba) a Location (Umístění). Pokud datový proud odpovídá "Umístění" místo "Osoba", set tuto možnost "Umístění" (nebo jeho úplný název včetně balíčku "com.example.protos.Location").
-
schema.registry.schema.evolution.mode
- Výchozí hodnota: "restart".
- Podporované režimy:
- "restart"
- "none"
- Tato možnost nastaví režim evoluce schemapro
from_protobuf()
. Na začátku dotazu Spark zaznamená nejnovější schema-id daného předmětu. Určuje schema profrom_protobuf()
. Po spuštění dotazu může být do registru schema publikován nový schema. Když je v příchozím záznamu zaznamenáno novější schema-id, znamená to změnu v schema. Tato možnost určuje, jak se změna kódování na schema zpracovává:-
restart (výchozí): Spustí
UnknownFieldException
při zjištění novějšího schema-id. Tím se dotaz ukončí. Databricks doporučuje nakonfigurovat úlohy tak, aby se při selhání dotazu restartovaly a zohlednily změny schema. - žádné: změny Schema-id se ignorují. Záznamy s novějším schema-id se zpracovávají s týmž schema, který byl pozorován na začátku dotazu. Očekává se, že novější definice Protobuf budou zpětně kompatibilní a nová pole budou ignorována.
-
restart (výchozí): Spustí
-
Confluent.schemarejstřík.
<schema-registy-client-option>
- Volitelné
-
Schema-registry se připojuje ke Confluent schema-registry pomocí klienta Confluent Registry Schema. Všechny možnosti konfigurace podporované klientem je možné zadat s předponou "confluent".schema.registry. Například následující dvě nastavení poskytují ověřování typu USER_INFO credentials:
- "confluent.schema.registry.basic.auth.credentials.source: "USER_INFO"
- "confluent.schema.registry.basic.auth.user.info": "
<KEY>
:<SECRET>
"