Delen via


Protocolbuffers lezen en schrijven

Azure Databricks biedt systeemeigen ondersteuning voor serialisatie en deserialisatie tussen Apache Spark-structs en protocolbuffers (protobuf). Protobuf-ondersteuning wordt geïmplementeerd als een Apache Spark DataFrame-transformator en kan worden gebruikt met Structured Streaming of voor batchbewerkingen.

Protocolbuffers deserialiseren en serialiseren

In Databricks Runtime 12.2 LTS en hoger kunt from_protobufto_protobuf u gegevens serialiseren en deserialiseren. Protobuf-serialisatie wordt vaak gebruikt in streamingworkloads.

De basissyntaxis voor protobuf-functies is vergelijkbaar voor lees- en schrijffuncties. U moet deze functies importeren voordat u deze gebruikt.

from_protobuf zet binaire gegevens column om naar een struct, en to_protobuf zet een struct column om naar binaire gegevens. U moet een schema register opgeven dat is opgegeven met het argument options of een descriptorbestand dat is geïdentificeerd door het argument descFilePath.

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

De volgende voorbeelden illustreren het verwerken van binaire protobuf-records met from_protobuf() en het converteren van Spark SQL-struct naar binaire protobuf met to_protobuf().

Protobuf gebruiken met Confluent Schema Registry

Azure Databricks biedt ondersteuning voor het gebruik van de Confluent Schema Registry- om Protobuf te definiëren.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

Verifiëren bij een extern Confluent Schema Register

Om u te authenticeren bij een externe Confluent-Schema Registry, update uw schema registry-opties om authenticatie-credentials en API-sleutels op te nemen.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

TrustStore- en keystore-bestanden gebruiken in Unity Catalogvolumes

In Databricks Runtime 14.3 LTS en hoger kunt u truststore- en sleutelopslagbestanden in Unity Catalogvolumes gebruiken om te verifiëren bij een Confluent Schema Registry. Update uw schema registeropties aan de hand van het volgende voorbeeld:

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

Protobuf gebruiken met een descriptorbestand

U kunt ook verwijzen naar een protobuf descriptorbestand dat beschikbaar is voor uw rekencluster. Zorg ervoor dat u over de juiste machtigingen beschikt om het bestand te lezen, afhankelijk van de locatie.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Ondersteunde opties in Protobuf-functies

De volgende opties worden ondersteund in Protobuf-functies.

  • modus: bepaalt hoe fouten tijdens het deseriialiseren van Protobuf-records worden verwerkt. De fouten kunnen worden veroorzaakt door verschillende typen onjuiste gegevens, waaronder een afwijking tussen de werkelijke schema van het record en de verwachte schema die is opgegeven in from_protobuf().
    • Values:
      • FAILFAST(standaard): er wordt een fout gegenereerd wanneer er een onjuiste record wordt aangetroffen en de taak mislukt.
      • PERMISSIVE: Er wordt een NULL geretourneerd voor ongeldige records. Gebruik deze optie zorgvuldig, omdat dit kan leiden tot het verwijderen van veel records. Dit is handig wanneer een klein deel van de records in de bron onjuist is.
  • recursive.fields.max.depth: voegt ondersteuning toe voor recursieve velden. Spark SQL-schema's bieden geen ondersteuning voor recursieve velden. Wanneer deze optie niet is opgegeven, zijn recursieve velden niet toegestaan. Om recursieve velden in Protobufs te ondersteunen, moeten ze worden uitgebreid naar een opgegeven diepte.
    • Values:

      • -1 (standaard): Recursieve velden zijn niet toegestaan.

      • 0: Recursieve velden worden verwijderd.

      • 1: Hiermee staat u één niveau van recursie toe.

      • [2-10]: Geef een drempelwaarde op voor meerdere recursieniveaus, maximaal 10 niveaus.

        Als u een waarde instelt op meer dan 0, kunnen recursieve velden worden gebruikt door de geneste velden uit te breiden naar de geconfigureerde diepte. Values groter dan 10 zijn niet toegestaan om te voorkomen dat onbedoeld zeer grote schema's worden gemaakt. Als een Protobuf-bericht een diepte heeft die de geconfigureerde limitoverschrijdt, wordt de Spark-struct die wordt geretourneerd afgekapt na de recursie limit.

    • Voorbeeld: Bekijk een Protobuf met het volgende recursieve veld:

      message Person { string name = 1; Person friend = 2; }
      

      De volgende lijst toont de eind-schema met verschillende values voor deze instelling:

      • Optie set tot 1: STRUCT<name: STRING>
      • Optie set tot 2: STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • Optie set tot 3: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json: Met deze optie kunt u Protobuf Any-velden converteren naar JSON. Deze functie moet zorgvuldig worden ingeschakeld. JSON-conversie en -verwerking zijn inefficiënt. Daarnaast verliest het JSON-tekenreeksveld Protobuf schema veiligheid waardoor downstreamverwerking gevoelig is voor fouten.
    • Values:

      • False (standaard): Tijdens runtime kunnen dergelijke jokertekenvelden willekeurige Protobuf-berichten bevatten als binaire gegevens. Dergelijke velden worden standaard verwerkt als een normaal Protobuf-bericht. Het heeft twee velden met schema(STRUCT<type_url: STRING, value: BINARY>). Standaard wordt het binaire value veld op geen enkele manier geïnterpreteerd. Maar de binaire gegevens zijn mogelijk niet handig in de praktijk om in sommige toepassingen te werken.
      • Waar: Als u deze waarde instelt op True, kunnen velden tijdens runtime worden geconverteerd Any naar JSON-tekenreeksen. Met deze optie wordt het binaire bestand geparseerd en wordt het Protobuf-bericht gedeserialiseerd in een JSON-tekenreeks.
    • Voorbeeld: Overweeg twee Protobuf-typen die als volgt zijn gedefinieerd:

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      Als deze optie is ingeschakeld, is de schema voor from_protobuf("col", messageName ="ProtoWithAny"): STRUCT<event_name: STRING, details: STRING>.

      Als het veld Protobuf-bericht bevatdetails, Person ziet de geretourneerde waarde er als volgt uit: ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}').

    • Vereisten:

      • De definities voor alle mogelijke Protobuf-typen die in velden worden gebruikt Any , moeten beschikbaar zijn in het Protobuf-descriptorbestand dat wordt doorgegeven aan from_protobuf().
      • Als Any Protobuf niet wordt gevonden, resulteert dit in een fout voor die record.
      • Deze functie wordt momenteel niet ondersteund met schema-registry.
  • emit.default.values: Hiermee wordt het mogelijk gemaakt om weergavevelden met nul values weer te geven bij het deserialiseren van Protobuf naar een Spark-struct. Deze optie moet spaarzaam worden gebruikt. Het is meestal niet raadzaam om afhankelijk te zijn van dergelijke fijnere verschillen in semantiek.
    • Values

      • False (standaard): Wanneer een veld leeg is in de geserialiseerde Protobuf, is het resulterende veld in de Spark-struct standaard null. Het is eenvoudiger om deze optie niet in te schakelen en te behandelen null als de standaardwaarde.
      • Juist: Wanneer deze optie is ingeschakeld, worden dergelijke velden gevuld met de bijbehorende standaardwaarde values.
    • Voorbeeld: Bekijk de volgende Protobuf met de Protobuf die is samengesteld als Person(age=0, middle_name=""):

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • Met deze optie set op False, zou de Spark-struct na het aanroepen van from_protobuf() allemaal null-waarden zijn: {"name": null, "age": null, "middle_name": "", "salary": null}. Hoewel twee velden (age en middle_name) valuessethadden, bevat Protobuf ze niet in wire-format omdat ze standaard valueszijn.
      • Met deze optie set ingesteld op Waar, zou de Spark-structuur na het uitvoeren van from_protobuf(): {"name": "", "age": 0, "middle_name": "", "salary": null}zijn. Het salary veld blijft null omdat het expliciet wordt gedeclareerd optional en het is niet set in de invoerrecord.
  • enums.as.ints: als deze optie is ingeschakeld, worden opsommingsvelden in Protobuf weergegeven als gehele getallen in Spark.
    • Values

      • False (standaard)
      • Waar: wanneer deze optie is ingeschakeld, worden opsommingsvelden in Protobuf weergegeven als gehele getallen in Spark.
    • Voorbeeld: Bekijk de volgende Protobuf:

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      Gegeven een Protobuf bericht zoals Person(job = ENGINEER):

      • Als deze optie is uitgeschakeld, zou de bijbehorende Spark-struct zijn {"job": "ENGINEER"}.
      • Als deze optie is ingeschakeld, zou de bijbehorende Spark-struct zijn {"job": 1}.

      U ziet dat de schema voor deze velden in elk geval anders is (geheel getal in plaats van standaardtekenreeks). Een dergelijke wijziging kan van invloed zijn op de schema van de downstream tables.

Schema Registeropties

De volgende schema registeropties zijn relevant bij het gebruik van schema register met Protobuf-functies.

  • schema.registry.subject
    • Vereist
    • Hier wordt het onderwerp voor schema in het Schema-register gespecificeerd, zoals 'client-event'.
  • schema.registry.address
    • Vereist
    • URL voor schema register, zoals https://schema-registry.example.com:8081
  • schema.registry.protobuf.name
    • Optioneel
    • Standaard: <NONE>.
    • Een schema-registervermelding voor een onderwerp kan meerdere Protobuf-definities bevatten, net als één proto bestand. Wanneer deze optie niet is opgegeven, wordt de eerste Protobuf gebruikt voor de schema. Geef de naam van het Protobuf-bericht op wanneer dit niet de eerste is in de vermelding. Denk bijvoorbeeld aan een vermelding met twee Protobuf-definities: 'Persoon' en 'Locatie' in die volgorde. Als de stream overeenkomt met 'Locatie' in plaats van 'Persoon', set deze optie op 'Locatie' (of de volledige naam, inclusief pakket 'com.example.protos.Location').
  • schema.registry.schema.evolution.mode
    • Standaard: 'opnieuw opstarten'.
    • Ondersteunde modi:
      • "opnieuw opstarten"
      • "geen"
    • Met deze optie stelt u schema-evolutiemodus voor from_protobuf()in. Aan het begin van een query registreert Spark de meest recente schema-id voor het opgegeven onderwerp. Dit bepaalt de schema voor from_protobuf(). Er kan een nieuwe schema worden gepubliceerd naar het schema register nadat de query is gestart. Wanneer een nieuwere schema-id wordt opgemerkt in een binnenkomend record, geeft dit een verandering aan in de schema. Met deze optie wordt bepaald hoe een dergelijke wijziging in schema wordt verwerkt:
      • herstart (standaard): Hiermee wordt een UnknownFieldException geactiveerd wanneer een nieuwere schema-id wordt opgemerkt. Hiermee wordt de query beëindigd. Databricks raadt u aan om taken te configureren om bij een queryfout te herstarten om schema wijzigingen op te halen.
      • geen: Schema-id-wijzigingen worden genegeerd. De records met een nieuwere schema-id worden verwerkt met dezelfde schema die aan het begin van de query is waargenomen. Nieuwere Protobuf-definities zijn naar verwachting compatibel met eerdere versies en nieuwe velden worden genegeerd.
  • confluent.schema.registry.<schema-registy-client-option>
    • Optioneel
    • Schema-registry maakt verbinding met Confluent schema-registry met behulp van de Confluent Schema Registry-client. Alle configuratieopties die door de client worden ondersteund, kunnen worden opgegeven met het voorvoegsel Confluent.schema.registry. De volgende twee instellingen bieden bijvoorbeeld USER_INFO verificatie credentials:
      • "confluent.schema.registry.basic.auth.credentials.source': 'USER_INFO'
      • "confluent.schema.registry.basic.auth.user.info": "<KEY> : <SECRET>"