Condividi tramite


Buffer del protocollo di lettura e scrittura

Azure Databricks offre supporto nativo per la serializzazione e la deserializzazione tra gli struct apache Spark e i buffer di protocollo (protobuf). Il supporto protobuf viene implementato come trasformatore di dataframe Apache Spark e può essere usato con Structured Streaming o per le operazioni batch.

Come deserializzare e serializzare i buffer dei protocolli

In Databricks Runtime 12.2 LTS e versioni successive è possibile usare from_protobuf funzioni e to_protobuf per serializzare e deserializzare i dati. La serializzazione protobuf viene comunemente usata nei carichi di lavoro di streaming.

La sintassi di base per le funzioni protobuf è simile per le funzioni di lettura e scrittura. È necessario importare queste funzioni prima di usarle.

from_protobuf effettua il cast di una colonna binaria in una struct e to_protobuf effettua il cast di una colonna struct in binario. È necessario fornire un registro schemi specificato con l'argomento options o un file descrittore identificato dall'argomento descFilePath.

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

Gli esempi seguenti illustrano l'elaborazione di record protobuf binari con from_protobuf() e la conversione dello struct Spark SQL in protobuf binario con to_protobuf().

Usare protobuf con Registro degli Schemi Confluent

Azure Databricks supporta l'uso del Registro schemi Confluent per definire Protobuf.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

eseguire l'autenticazione in un registro dello schema confluente esterno

Per eseguire l'autenticazione in un Registro schemi Confluent esterno, aggiornare le opzioni del Registro di sistema dello schema in modo da includere le credenziali di autenticazione e le chiavi API.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Usare file truststore e keystore dentro i volumi di Unity Catalog

In Databricks Runtime 14.3 LTS e versioni successive è possibile usare i file truststore e keystore nei volumi del catalogo Unity per eseguire l'autenticazione in un Registro schemi Confluent. Aggiorna le tue opzioni del registro dello schema secondo l'esempio seguente:

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

Usare Protobuf con un file descrittore

È anche possibile fare riferimento a un file di descrittore protobuf disponibile per il cluster di calcolo. Assicurarsi di disporre delle autorizzazioni appropriate per leggere il file, a seconda del percorso.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Opzioni supportate nelle funzioni Protobuf

Le opzioni seguenti sono supportate nelle funzioni Protobuf.

  • mode: determina la modalità di gestione degli errori durante la deserializzazione dei record Protobuf. Gli errori possono essere causati da vari tipi di record in formato non valido, tra cui una mancata corrispondenza tra lo schema effettivo del record e lo schema previsto fornito in from_protobuf().
    • valori:
      • FAILFAST(impostazione predefinita): viene generato un errore quando viene rilevato un record in formato non valido e l'attività non riesce.
      • PERMISSIVE: viene restituito un valore NULL per i record in formato non valido. Usare questa opzione con attenzione perché può comportare l'eliminazione di molti record. Ciò è utile quando una piccola frazione dei record nell'origine non è corretta.
  • recursive.fields.max.depth: aggiunge il supporto per i campi ricorsivi. Gli schemi SQL spark non supportano campi ricorsivi. Quando questa opzione non è specificata, i campi ricorsivi non sono consentiti. Per supportare i campi ricorsivi in Protobufs, è necessario espandersi fino a una profondità specificata.
    • Valori:

      • -1 (impostazione predefinita): i campi ricorsivi non sono consentiti.

      • 0: i campi ricorsivi vengono eliminati.

      • 1: consente un singolo livello di ricorsione.

      • [2-10]: specificare una soglia per più ricorsioni, fino a 10 livelli.

        L'impostazione di un valore su maggiore di 0 consente campi ricorsivi espandendo i campi annidati fino alla profondità configurata. I valori maggiori di 10 non sono consentiti per evitare inavvertitamente la creazione di schemi molto grandi. Se un messaggio Protobuf ha una profondità maggiore del limite configurato, la struttura Spark restituita viene troncata al raggiungimento del limite di ricorsione.

    • Esempio: si consideri un protobuf con il campo ricorsivo seguente:

      message Person { string name = 1; Person friend = 2; }
      

      Di seguito è riportato l'elenco dello schema finale con valori diversi per questa impostazione:

      • Opzione impostata su 1: STRUCT<name: STRING>
      • Opzione impostata su 2: STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • Opzione impostata su 3: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json: questa opzione consente di convertire i campi Protobuf in JSON. Questa funzionalità deve essere abilitata attentamente. La conversione e l'elaborazione JSON sono inefficienti. Inoltre, il campo stringa JSON perde la sicurezza dello schema Protobuf rendendo l'elaborazione downstream soggetta a errori.
    • Valori:

      • False (impostazione predefinita): in fase di esecuzione, tali campi con caratteri jolly possono contenere messaggi Protobuf arbitrari come dati binari. Per impostazione predefinita, tali campi vengono gestiti come un normale messaggio Protobuf. Ha due campi con schema (STRUCT<type_url: STRING, value: BINARY>). Per impostazione predefinita, il campo binario value non viene interpretato in alcun modo. Tuttavia, i dati binari potrebbero non essere utili in pratica per funzionare in alcune applicazioni.
      • True: l'impostazione di questo valore su True consente la conversione dei Any campi in stringhe JSON in fase di esecuzione. Con questa opzione, il file binario viene analizzato e il messaggio Protobuf viene deserializzato in una stringa JSON.
    • Esempio: considerare due tipi Protobuf definiti come segue:

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      Con questa opzione abilitata, lo schema per from_protobuf("col", messageName ="ProtoWithAny") sarà: STRUCT<event_name: STRING, details: STRING>.

      In fase di esecuzione, se details il campo contiene Person il messaggio Protobuf, il valore restituito sarà simile al seguente: ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}').

    • Requisiti:

      • Le definizioni per tutti i possibili tipi Protobuf usati nei Any campi devono essere disponibili nel file descrittore Protobuf passato a from_protobuf().
      • Se Any Protobuf non viene trovato, verrà generato un errore per tale record.
      • Questa funzionalità attualmente non è supportata da schema-registry.
  • emit.default.values: abilita il rendering dei campi con valori zero durante la deserializzazione di Protobuf in uno struct Spark. Questa opzione deve essere usata con moderazione. In genere non è consigliabile dipendere da tali differenze più sottili nella semantica.
    • valori

      • False (impostazione predefinita): quando un campo è vuoto nel Protobuf serializzato, il campo risultante nello struct Spark è null per impostazione predefinita. È più semplice non abilitare questa opzione e considerare null come valore predefinito.
      • True: quando questa opzione è abilitata, tali campi vengono compilati con i valori predefiniti corrispondenti.
    • Esempio: si consideri il protobuf seguente con il Protobuf costruito come Person(age=0, middle_name=""):

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • Con questa opzione impostata su False, la struttura Spark dopo aver chiamato from_protobuf() sarà tutta nulla: {"name": null, "age": null, "middle_name": "", "salary": null}. Anche se due campi (age e middle_name) avevano valori impostati, Protobuf non li include in formato wire poiché sono valori predefiniti.
      • Con questa opzione impostata su True, lo struct Spark dopo aver chiamato from_protobuf() sarà: {"name": "", "age": 0, "middle_name": "", "salary": null}. Il campo salary rimane Null perché viene dichiarato in modo esplicito optional e non è impostato nel record di input.
  • enums.as.ints: se abilitata, i campi enumerazione in Protobuf vengono visualizzati come campi integer in Spark.
    • valori

      • False (impostazione predefinita)
      • True: se abilitata, il rendering dei campi enumerazione in Protobuf viene eseguito come campi integer in Spark.
    • Esempio: Si consideri il protobuf seguente:

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      Dato un messaggio Protobuf come Person(job = ENGINEER):

      • Con questa opzione disabilitata, lo struct Spark corrispondente sarà {"job": "ENGINEER"}.
      • Con questa opzione abilitata, lo struct Spark corrispondente sarà {"job": 1}.

      Si noti che lo schema per questi campi è diverso in ogni caso (integer anziché stringa predefinita). Tale modifica può influire sullo schema delle tabelle downstream.

Opzioni di Schema Registry

Le opzioni del registro degli schemi seguenti sono pertinenti quando si usa il registro degli schemi con le funzioni Protobuf.

  • schema.registry.subject
    • Richiesto
    • Specifica l'oggetto per lo schema nel Registro schemi, ad esempio "client-event"
  • schema.registry.address
    • Richiesto
    • URL del registro degli schemi, ad esempio https://schema-registry.example.com:8081
  • schema.registry.protobuf.name
    • Facoltativo
    • Impostazione predefinita: <NONE>.
    • Una registrazione del registro degli schemi per un soggetto può contenere più definizioni Protobuf, proprio come un singolo file proto. Quando questa opzione non viene specificata, il primo Protobuf viene usato per lo schema. Specificare il nome del messaggio Protobuf quando non è il primo nella voce. Si consideri, ad esempio, una voce con due definizioni Protobuf: "Person" e "Location" in tale ordine. Se il flusso corrisponde a "Location" invece che a "Person", imposta questa opzione su "Location" (o sul suo nome completo, incluso il pacchetto "com.example.protos.Location").
  • schema.registry.schema.evolution.mode
    • Impostazione predefinita: "restart".
    • Modalità supportate:
      • "restart"
      • "nessuno"
    • Questa opzione imposta la modalità di evoluzione dello schema per from_protobuf(). All'inizio di una query, Spark registra l'ID dello schema più recente per l'oggetto specificato. Questo determina lo schema per from_protobuf(). Un nuovo schema potrebbe essere pubblicato nel registro degli schemi dopo l'avvio della query. Quando un ID schema più recente viene notato in un record in ingresso, indica una modifica allo schema. Questa opzione determina come viene gestita tale modifica allo schema:
      • riavviare (impostazione predefinita): attiva un UnknownFieldException quando viene notato un ID dello schema più recente. Questa operazione termina la query. Databricks consiglia di configurare i processi da riavviare in caso di errore di query per recepire le modifiche allo schema.
      • nessun: le modifiche dell'ID dello schema vengono ignorate. I record con ID schema più recente vengono analizzati con lo stesso schema osservato all'inizio della query. Le definizioni Protobuf più recenti devono essere compatibili con le versioni precedenti e i nuovi campi vengono ignorati.
  • confluent.schema.registry.<schema-registy-client-option>
    • Facoltativo
    • Il registro degli schemi si connette al registro degli schemi Confluent utilizzando il client del registro degli schemi Confluent. Qualsiasi opzione di configurazione supportata dal client può essere specificata con il prefisso "confluent.schema.registry". Ad esempio, le due impostazioni seguenti forniscono le credenziali di autenticazione "USER_INFO":
      • "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO'
      • "confluent.schema.registry.basic.auth.user.info": "<KEY> : <SECRET>"