Condividi tramite


Iscriviti a Google Pub/Sub

Azure Databricks offre un connettore predefinito per sottoscrivere Google Pub/Sub in Databricks Runtime 13.3 LTS e versioni successive. Questo connettore fornisce semantica di elaborazione di tipo exactly-once per i record del sottoscrittore.

Nota

Pub/Sub potrebbe pubblicare record duplicati e i record potrebbero arrivare al sottoscrittore non in ordine. È necessario scrivere il codice di Azure Databricks per gestire i record duplicati e non ordinati.

Esempio di sintassi

Nell'esempio di codice seguente viene illustrata la sintassi di base per la configurazione di un flusso strutturato letto da Pub/Sub:

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // we will create a Pubsub subscription if none exists with this id
  .option("subscriptionId", "mysub") // required
  .option("topicId", "mytopic") // required
  .option("projectId", "myproject") // required
  .options(authOptions)
  .load()

Per altre opzioni di configurazione, vedere Configurare le opzioni per la lettura in streaming pub/sub.

Configurare l'accesso a Pub/Sub

Databricks consiglia di usare i segreti quando si forniscono opzioni di autorizzazione. Per autorizzare una connessione sono necessarie le opzioni seguenti:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Nell'table seguente vengono descritti i ruoli necessari per l'credentialsconfigurato:

Ruoli Obbligatorio o facoltativo Modalità di utilizzo
roles/pubsub.viewer oppure roles/viewer Richiesto Controllare se esiste una sottoscrizione e la sottoscrizione get
roles/pubsub.subscriber Richiesto Recuperare i dati da una sottoscrizione
roles/pubsub.editor oppure roles/editor Facoltativo Abilita la creazione di una sottoscrizione se non esiste e consente anche l'uso deleteSubscriptionOnStreamStop di per eliminare le sottoscrizioni alla terminazione del flusso

Pub/Sub schema

Il schema per il flusso corrisponde ai record recuperati da Pub/Sub, come descritto nell'tableseguente:

Campo Type
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Configurare le opzioni per la lettura in streaming pub/sub

Nell'table seguente vengono descritte le opzioni supportate per Pub/Sub. Tutte le opzioni vengono configurate come parte di una lettura structured streaming usando .option("<optionName>", "<optionValue>") la sintassi.

Nota

Alcune opzioni di configurazione pub/sub usano il concetto di recupero invece di micro batch. Ciò riflette i dettagli dell'implementazione interna e le opzioni funzionano in modo analogo a quelle di altri connettori Structured Streaming, ad eccezione del fatto che i record vengono recuperati e quindi elaborati.

Opzione Default value Descrizione
numFetchPartitions Set a metà del numero degli esecutori presenti all'inizializzazione dello stream. Numero di attività Spark parallele che recuperano record da una sottoscrizione.
deleteSubscriptionOnStreamStop false Se true, la sottoscrizione passata al flusso viene eliminata al termine del processo di streaming.
maxBytesPerTrigger Nessuno Un limit flessibile per l'elaborazione delle dimensioni del batch durante ogni micro batch attivato.
maxRecordsPerFetch 1000 Numero di record da recuperare per ogni attività prima dell'elaborazione dei record.
maxFetchPeriod 10 secondi Durata del recupero di ogni attività prima dell'elaborazione dei record. Databricks consiglia di usare il valore predefinito.

Semantica di elaborazione batch incrementale per Pub/Sub

È possibile usare Trigger.AvailableNow per utilizzare i record disponibili dalle origini Pub/Sub di un batch incrementale.

Azure Databricks registra il timestamp quando si inizia una lettura con l'impostazione Trigger.AvailableNow . I record elaborati dal batch includono tutti i dati recuperati in precedenza e tutti i record appena pubblicati con un timestamp minore del timestamp di inizio del flusso registrato.

Si veda Configurazione dell'elaborazione batch incrementale.

Monitoraggio delle metriche di streaming

Le metriche di stato di Structured Streaming segnalano il numero di record recuperati e pronti per l'elaborazione, le dimensioni dei record recuperati e pronti per l'elaborazione e il numero di duplicati rilevati dall'avvio del flusso. Di seguito è riportato un esempio di queste metriche:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Limiti

L'esecuzione speculativa (spark.speculation) non è supportata con Pub/Sub.