Dela via


Läs- och skrivprotokollbuffertar

Azure Databricks har inbyggt stöd för serialisering och deserialisering mellan Apache Spark-structs och protokollbuffertar (protobuf). Protobuf-stöd implementeras som en Apache Spark DataFrame-transformerare och kan användas med strukturerad direktuppspelning eller för batchåtgärder.

Så här deserialiserar och serialiserar du protokollbuffertar

I Databricks Runtime 12.2 LTS och senare kan du använda from_protobuf och to_protobuf funktioner för att serialisera och deserialisera data. Protobuf-serialisering används ofta i strömningsarbetsbelastningar.

Den grundläggande syntaxen för protobuf-funktioner liknar läs- och skrivfunktioner. Du måste importera dessa funktioner innan du kan använda dem.

from_protobuf kastar en binär data column till en struktur och to_protobuf kastar en struktur column till binär data. Du måste ange antingen ett schema register som anges med argumentet options eller en beskrivande fil som identifieras av argumentet descFilePath.

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

Följande exempel illustrerar bearbetning av binära protobuf-poster med from_protobuf() och konvertering av Spark SQL-struct till binär protobuf med to_protobuf().

Använd protobuf med Confluent Schema Registry

Azure Databricks stöder användning av Confluent Schema Registry för att definiera Protobuf.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

Autentisera till ett externt Confluent-Schema-register

Om du vill autentisera till ett externt Confluent-Schema-register update dina schema registeralternativ för att inkludera autentisering credentials- och API-nycklar.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Använd säkerhetsarkiv- och nyckelarkivfiler i Unity Catalogvolumes

I Databricks Runtime 14.3 LTS och senare kan du använda säkerhetsarkiv- och nyckelarkivfiler i Unity Catalogvolumes för att autentisera till ett Confluent-Schema-register. Update dina schema registerinställningar enligt följande exempel:

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

Använda Protobuf med en beskrivningsfil

Du kan också referera till en protobuf-beskrivningsfil som är tillgänglig för ditt beräkningskluster. Kontrollera att du har rätt behörighet att läsa filen, beroende på var den finns.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Alternativ som stöds i Protobuf-funktioner

Följande alternativ stöds i Protobuf-funktioner.

  • läge: Avgör hur fel vid deserialisering av Protobuf-poster hanteras. Felen kan orsakas av olika typer av felaktiga poster, inklusive ett matchningsfel mellan postens faktiska schema och den förväntade schema som anges i from_protobuf().
    • Values:
      • FAILFAST(standard): Ett fel utlöses när en felaktig post påträffas och aktiviteten misslyckas.
      • PERMISSIVE: En NULL returneras för felaktiga poster. Använd det här alternativet noggrant eftersom det kan leda till att många poster tappas. Detta är användbart när en liten del av posterna i källan är felaktiga.
  • recursive.fields.max.depth: Lägger till stöd för rekursiva fält. Spark SQL-scheman stöder inte rekursiva fält. När det här alternativet inte har angetts tillåts inte rekursiva fält. För att stödja rekursiva fält i Protobufs måste de expandera till ett angivet djup.
    • Values:

      • -1 (standard): Rekursiva fält tillåts inte.

      • 0: Rekursiva fält tas bort.

      • 1: Tillåter en enda rekursionsnivå.

      • [2–10]: Ange ett tröskelvärde för multipel rekursion, upp till 10 nivåer.

        Om du anger ett värde till större än 0 kan rekursiva fält expanderas de kapslade fälten till det konfigurerade djupet. Values större än 10 tillåts inte för att undvika att oavsiktligt skapa mycket stora scheman. Om ett Protobuf-meddelande har ett djup utöver det konfigurerade limit, trunkeras den returnerade Spark-strukturen efter rekursionen limit.

    • Exempel: Överväg en Protobuf med följande rekursiva fält:

      message Person { string name = 1; Person friend = 2; }
      

      Följande visar slutet schema med olika values för den här inställningen:

      • Alternativ set till 1: STRUCT<name: STRING>
      • Alternativ set till 2: STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • Alternativ set till 3: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json: Med det här alternativet kan du konvertera Protobuf Alla fält till JSON. Den här funktionen bör aktiveras noggrant. JSON-konvertering och -bearbetning är ineffektiva. Dessutom förlorar JSON-strängfältet Protobuf schema säkerhet, vilket gör nedströmsbearbetningen benägen för fel.
    • Values:

      • Falskt (standard): Vid körning kan sådana jokerteckenfält innehålla godtyckliga Protobuf-meddelanden som binära data. Som standard hanteras sådana fält som ett vanligt Protobuf-meddelande. Den har två fält med schema(STRUCT<type_url: STRING, value: BINARY>). Som standard tolkas inte det binära value fältet på något sätt. Men binära data kanske inte är praktiska i praktiken för att fungera i vissa program.
      • Sant: Om du ställer in det här värdet på Sant kan du konvertera Any fält till JSON-strängar vid körning. Med det här alternativet parsas binärfilen och Protobuf-meddelandet deserialiseras till en JSON-sträng.
    • Exempel: Överväg två Protobuf-typer som definierats på följande sätt:

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      Med det här alternativet aktiverat är schema för from_protobuf("col", messageName ="ProtoWithAny"): STRUCT<event_name: STRING, details: STRING>.

      Vid körning, om details fältet innehåller Person Protobuf-meddelande, ser det returnerade värdet ut så här: ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}').

    • Krav:

      • Definitionerna för alla möjliga Protobuf-typer som används i Any fält bör vara tillgängliga i protobuf-beskrivningsfilen som skickas till from_protobuf().
      • Om Any Protobuf inte hittas resulterar det i ett fel för posten.
      • Den här funktionen stöds för närvarande inte med schema-registry.
  • emit.default.values: Aktiverar renderingsfält med noll values när Protobuf deserialiseras till en Spark-struktur. Det här alternativet bör användas sparsamt. Det är vanligtvis inte tillrådligt att vara beroende av sådana finare skillnader i semantik.
    • Values

      • Falskt (standard): När ett fält är tomt i den serialiserade Protobuf är det resulterande fältet i Spark-structen som standard null. Det är enklare att inte aktivera det här alternativet och behandla null som standardvärde.
      • Sant: När det här alternativet är aktiverat fylls sådana fält med motsvarande standard values.
    • Exempel: Tänk på följande Protobuf med Protobuf konstruerad som Person(age=0, middle_name=""):

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • Med det här alternativet set till False blir Spark-strukturen efter att anropa from_protobuf() alla null-värden: {"name": null, "age": null, "middle_name": "", "salary": null}. Även om två fält (age och middle_name) hade valuesset, inkluderar Protobuf dem inte i trådformat eftersom de är standard values.
      • Med det här alternativet set till Sant skulle Spark-structen efter att ha anropat from_protobuf() vara: {"name": "", "age": 0, "middle_name": "", "salary": null}. Fältet salary förblir null eftersom det uttryckligen deklareras optional och inte set i indataposten.
  • enums.as.ints: När det är aktiverat återges uppräkningsfält i Protobuf som heltalsfält i Spark.
    • Values

      • False (standard)
      • Sant: När det är aktiverat återges uppräkningsfält i Protobuf som heltalsfält i Spark.
    • Exempel: Överväg följande Protobuf:

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      Ges ett Protobuf-meddelande som Person(job = ENGINEER):

      • Med det här alternativet inaktiverat skulle motsvarande Spark-struct vara {"job": "ENGINEER"}.
      • Med det här alternativet aktiverat skulle motsvarande Spark-struct vara {"job": 1}.

      Observera att schema för dessa fält skiljer sig åt i varje enskilt fall (heltal i stället för standardsträng). En sådan ändring kan påverka schema för nedströms tables.

Schema registerinställningar

Följande schema registeralternativ är relevanta när du använder schema register med Protobuf-funktioner.

  • schema.registry.subject
    • Obligatoriskt
    • Anger ämne för schema i Schema Registry, till exempel "klienthändelse"
  • schema.registry.address
    • Obligatoriskt
    • URL för schema register, till exempel https://schema-registry.example.com:8081
  • schema.registry.protobuf.name
    • Valfritt
    • Standard: <NONE>.
    • En schema-registerpost för ett ämne kan innehålla flera Protobuf-definitioner, precis som en enda proto fil. När det här alternativet inte anges används den första Protobuf för schema. Ange namnet på Protobuf-meddelandet när det inte är det första i posten. Överväg till exempel en post med två Protobuf-definitioner: "Person" och "Plats" i den ordningen. Om strömmen motsvarar "Plats" i stället för "Person" set det här alternativet till "Plats" (eller dess fullständiga namn, inklusive paketet "com.example.protos.Location").
  • schema.registry.schema.evolution.mode
    • Standard: "restart".
    • Lägen som stöds:
      • "starta om"
      • "ingen"
    • Det här alternativet anger schema-evolution-läge för from_protobuf(). I början av en fråga registrerar Spark den senaste schema-id för det angivna ämnet. Detta avgör schema för from_protobuf(). En ny schema kan publiceras i schema-registret efter att sökningen har påbörjats. När en nyare schema-id visas i en inkommande post, indikerar det en ändring av schema. Det här alternativet avgör hur en sådan ändring av schema hanteras:
      • starta om (standard): Utlöser en UnknownFieldException när ett nyare schema-id uppmärksammas. Frågan avslutas. Databricks rekommenderar att du konfigurerar jobb för att starta om när ett frågefel inträffar för att implementera schema-ändringar.
      • inga: Schema-id-ändringar ignoreras. Posterna med nyare schema-id tolkas med samma schema som observerades i början av förfrågan. Nyare Protobuf-definitioner förväntas vara bakåtkompatibla och nya fält ignoreras.
  • sammansmält.schema.register.<schema-registy-client-option>
    • Valfritt
    • Schema-registry ansluter till Confluent schema-registry med hjälp av Confluent Schema Registry-klienten. Alla konfigurationsalternativ som stöds av klienten kan anges med prefixet "confluent.schema.registry". Följande två inställningar anger till exempel "USER_INFO" autentisering credentials:
      • confluent.schema.registry.basic.auth.credentials.source: "USER_INFO"
      • "confluent.schema.registry.basic.auth.user.info": "<KEY> : <SECRET>"