Sdílet prostřednictvím


Vyrovnávací paměti protokolu pro čtení a zápis

Azure Databricks poskytuje nativní podporu serializace a deserializace mezi strukturami Apache Sparku a vyrovnávacími paměťmi protokolu (protobuf). Podpora Protobuf je implementována jako transformátor datového rámce Apache Spark a lze ji použít se strukturovaným streamováním nebo pro dávkové operace.

Jak deserializovat a serializovat vyrovnávací paměti protokolu

Ve verzi Databricks Runtime 12.2 LTS a vyšší můžete data serializovat a deserializovat pomocí from_protobuf funkcí to_protobuf . Serializace Protobuf se běžně používá v úlohách streamování.

Základní syntaxe funkcí protobuf je podobná pro funkce pro čtení a zápis. Před použitím je nutné tyto funkce importovat.

from_protobuf přetypuje binární column na strukturu a to_protobuf přetypuje strukturu column na binární. Je nutné zadat registr schema zadaný argumentem options nebo soubor popisovače identifikovaný argumentem descFilePath.

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

Následující příklady ilustrují zpracování binárních záznamů protobuf pomocí from_protobuf() a převod struktury Spark SQL na binární protobuf s to_protobuf().

Použití protobuf s registrem Schema Confluent

Azure Databricks podporuje k definování protobuf pomocí Registry Confluent.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

Ověřte se u externího Confluent registru Schema

Pokud se chcete ověřit v externím registru confluent Schema, update možnosti registru schema, aby zahrnovaly ověřovací credentials a klíče rozhraní API.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Použití úložiště důvěryhodnosti a souborů úložiště klíčů v Unity Catalogvolumes

Ve službě Databricks Runtime 14.3 LTS a novější můžete v rámci prostředí Unity Catalogvolumes využít soubory truststore a keystore k autentizaci v registru Confluent Schema. Update možnosti registru schema podle následujícího příkladu:

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

Použití Protobuf se souborem popisovače

Můžete také odkazovat na soubor popisovače protobuf, který je dostupný pro váš výpočetní cluster. Ujistěte se, že máte správná oprávnění ke čtení souboru v závislosti na jeho umístění.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Podporované možnosti ve funkcích Protobuf

Funkce Protobuf podporují následující možnosti.

  • mode: Určuje, jak se zpracovávají chyby při deserializaci záznamů Protobuf. Chyby mohou být způsobeny různými typy poškozených záznamů, včetně neshody mezi skutečnými schema záznamu a očekávanými schema zadanými v from_protobuf().
    • Values:
      • FAILFAST(výchozí): Při výskytu poškozeného záznamu dojde k chybě a úloha selže.
      • PERMISSIVE: Pro poškozené záznamy se vrátí hodnota NULL. Tuto možnost používejte pečlivě, protože může vést k zahození mnoha záznamů. To je užitečné v případě, že malá část záznamů ve zdroji není správná.
  • recursive.fields.max.depth: Přidá podporu rekurzivních polí. Schémata Spark SQL nepodporují rekurzivní pole. Pokud tato možnost není zadána, rekurzivní pole nejsou povolena. Aby bylo možné podporovat rekurzivní pole v Protobufs, je potřeba je rozšířit na zadanou hloubku.
    • Values:

      • -1 (výchozí): Rekurzivní pole nejsou povolená.

      • 0: Rekurzivní pole se zahodí.

      • 1: Umožňuje jednu úroveň rekurze.

      • [2–10]: Zadejte prahovou hodnotu pro více rekurzí až 10 úrovní.

        Nastavení hodnoty větší než 0 umožňuje rekurzivní pole rozbalením vnořených polí na nakonfigurovanou hloubku. Values větší než 10 není povoleno, aby nedocházelo k neúmyslnému vytváření velmi velkých schémat. Pokud zpráva Protobuf dosáhne hloubky přesahující nakonfigurovaný limit, struktura Sparku je zkrácena po dokončení rekurze limit.

    • Příklad: Zvažte Protobuf s následujícím rekurzivním polem:

      message Person { string name = 1; Person friend = 2; }
      

      Následující seznam uvádí koncový schema s různými values pro toto nastavení:

      • Možnost set na 1: STRUCT<name: STRING>
      • Možnost set na 2: STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • Možnost set na 3: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json: Tato možnost umožňuje převádět pole Protobuf Any na JSON. Tuto funkci byste měli povolit pečlivě. Převod a zpracování JSON je neefektivní. Kromě toho řetězcové pole JSON ztratí bezpečnost Protobuf schema, což činí další zpracování náchylným k chybám.
    • Values:

      • False (výchozí): Za běhu můžou taková pole se zástupnými cardy obsahovat libovolné zprávy Protobuf jako binární data. Ve výchozím nastavení se tato pole zpracovávají jako normální zpráva Protobuf. Má dvě pole s schema(STRUCT<type_url: STRING, value: BINARY>). Ve výchozím nastavení value binární pole není interpretováno žádným způsobem. Binární data ale nemusí být v praxi vhodná pro práci v některých aplikacích.
      • True: Nastavení této hodnoty na Hodnotu True umožňuje převádět Any pole na řetězce JSON za běhu. Při této možnosti se binární soubor analyzuje a zpráva Protobuf se deserializuje do řetězce JSON.
    • Příklad: Zvažte dva typy Protobuf definované takto:

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      Pokud je tato možnost povolena, bude schema pro from_protobuf("col", messageName ="ProtoWithAny"): STRUCT<event_name: STRING, details: STRING>.

      Pokud pole v době details běhu obsahuje Person zprávu Protobuf, vrácená hodnota vypadá takto: ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}').

    • Požadavky:

      • Definice všech možných typů Protobuf, které se používají v Any polích, by měly být k dispozici v souboru popisovače Protobuf předané .from_protobuf()
      • Pokud Any protobuf nebyl nalezen, dojde k chybě pro tento záznam.
      • Tato funkce se v současné době nepodporuje u schema-registry.
  • emit.default.values: Povolí zobrazování polí s nulovou values při deserializaci Protobuf na strukturu Spark. Tato možnost by se měla používat střídmě. Obvykle se nedoporučuje záviset na takových jemných rozdílech v sémantice.
    • Values

      • False (výchozí): Pokud je pole v serializovaném protobuf prázdné, výsledné pole ve struktuře Sparku má ve výchozím nastavení hodnotu null. Tuto možnost nelze povolit a považovat null ji za výchozí hodnotu.
      • Pravda: Pokud je tato možnost povolena, jsou tato pole vyplněna odpovídající výchozí hodnotou values.
    • Příklad: Zvažte následující Protobuf s Protobuf vytvořený jako Person(age=0, middle_name=""):

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • Při nastavení možnosti set na False by struktura Sparku po volání from_protobuf() obsahovala pouze hodnoty null: {"name": null, "age": null, "middle_name": "", "salary": null}. I když dvě pole (age a middle_name) měla valuesset, Protobuf je v drátovém formátu neobsahuje, protože jsou výchozí values.
      • Při nastavení možnosti set na hodnotu True bude struktura Sparku po volání from_protobuf() vypadat takto: {"name": "", "age": 0, "middle_name": "", "salary": null}. Pole salary zůstává null, protože je explicitně deklarováno optional a není set ve vstupním záznamu.
  • enums.as.ints: Při povolení se pole výčtu v Protobuf vykreslují jako celočíselná pole ve Sparku.
    • Values

      • False (výchozí)
      • True: Pokud je tato možnost povolená, vykreslí se pole výčtu v Protobuf jako celočíselná pole ve Sparku.
    • Příklad: Zvažte následující protobuf:

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      Vzhledem k tomu, protobuf zpráva jako Person(job = ENGINEER):

      • Pokud je tato možnost zakázaná, odpovídající struktura Sparku by byla {"job": "ENGINEER"}.
      • Pokud je tato možnost povolená, odpovídající struktura Sparku by byla {"job": 1}.

      Všimněte si, že schema pro tato pole se v každém případě liší (celé číslo místo výchozího řetězce). Taková změna může ovlivnit schema následných tables.

možnosti registru Schema

Při používání registru schema s funkcemi Protobuf jsou relevantní následující možnosti registru schema.

  • schema.registry.subject
    • Požaduje se
    • Určuje předmět schema v registru Schema, například "client-event".
  • schema.registry.address
    • Požaduje se
    • Adresa URL registru schema, například https://schema-registry.example.com:8081
  • schema.registry.protobuf.name
    • Volitelné
    • Výchozí hodnota: <NONE>.
    • Položka registru schemapro předmět může obsahovat více definic Protobuf, stejně jako jeden soubor proto. Pokud tato možnost není zadána, použije se první Protobuf pro schema. Zadejte název zprávy Protobuf, pokud není první v položce. Představte si například položku se dvěma definicemi Protobuf: Person (Osoba) a Location (Umístění). Pokud datový proud odpovídá "Umístění" místo "Osoba", set tuto možnost "Umístění" (nebo jeho úplný název včetně balíčku "com.example.protos.Location").
  • schema.registry.schema.evolution.mode
    • Výchozí hodnota: "restart".
    • Podporované režimy:
      • "restart"
      • "none"
    • Tato možnost nastaví režim evoluce schemapro from_protobuf(). Na začátku dotazu Spark zaznamená nejnovější schema-id daného předmětu. Určuje schema pro from_protobuf(). Po spuštění dotazu může být do registru schema publikován nový schema. Když je v příchozím záznamu zaznamenáno novější schema-id, znamená to změnu v schema. Tato možnost určuje, jak se změna kódování na schema zpracovává:
      • restart (výchozí): Spustí UnknownFieldException při zjištění novějšího schema-id. Tím se dotaz ukončí. Databricks doporučuje nakonfigurovat úlohy tak, aby se při selhání dotazu restartovaly a zohlednily změny schema.
      • žádné: změny Schema-id se ignorují. Záznamy s novějším schema-id se zpracovávají s týmž schema, který byl pozorován na začátku dotazu. Očekává se, že novější definice Protobuf budou zpětně kompatibilní a nová pole budou ignorována.
  • Confluent.schemarejstřík.<schema-registy-client-option>
    • Volitelné
    • Schema-registry se připojuje ke Confluent schema-registry pomocí klienta Confluent Registry Schema. Všechny možnosti konfigurace podporované klientem je možné zadat s předponou "confluent".schema.registry. Například následující dvě nastavení poskytují ověřování typu USER_INFO credentials:
      • "confluent.schema.registry.basic.auth.credentials.source: "USER_INFO"
      • "confluent.schema.registry.basic.auth.user.info": "<KEY>: <SECRET>"