Iscriviti a Google Pub/Sub
Azure Databricks offre un connettore predefinito per sottoscrivere Google Pub/Sub in Databricks Runtime 13.3 LTS e versioni successive. Questo connettore fornisce semantica di elaborazione di tipo exactly-once per i record del sottoscrittore.
Nota
Pub/Sub potrebbe pubblicare record duplicati e i record potrebbero arrivare al sottoscrittore non in ordine. È necessario scrivere il codice di Azure Databricks per gestire i record duplicati e non ordinati.
Esempio di sintassi
Nell'esempio di codice seguente viene illustrata la sintassi di base per la configurazione di un flusso strutturato letto da Pub/Sub:
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
Per altre opzioni di configurazione, vedere Configurare le opzioni per la lettura in streaming pub/sub.
Configurare l'accesso a Pub/Sub
Databricks consiglia di usare i segreti quando si forniscono opzioni di autorizzazione. Per autorizzare una connessione sono necessarie le opzioni seguenti:
clientEmail
clientId
privateKey
privateKeyId
Nell'table seguente vengono descritti i ruoli necessari per l'credentialsconfigurato:
Ruoli | Obbligatorio o facoltativo | Modalità di utilizzo |
---|---|---|
roles/pubsub.viewer oppure roles/viewer |
Richiesto | Controllare se esiste una sottoscrizione e la sottoscrizione get |
roles/pubsub.subscriber |
Richiesto | Recuperare i dati da una sottoscrizione |
roles/pubsub.editor oppure roles/editor |
Facoltativo | Abilita la creazione di una sottoscrizione se non esiste e consente anche l'uso deleteSubscriptionOnStreamStop di per eliminare le sottoscrizioni alla terminazione del flusso |
Pub/Sub schema
Il schema per il flusso corrisponde ai record recuperati da Pub/Sub, come descritto nell'tableseguente:
Campo | Type |
---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Configurare le opzioni per la lettura in streaming pub/sub
Nell'table seguente vengono descritte le opzioni supportate per Pub/Sub. Tutte le opzioni vengono configurate come parte di una lettura structured streaming usando .option("<optionName>", "<optionValue>")
la sintassi.
Nota
Alcune opzioni di configurazione pub/sub usano il concetto di recupero invece di micro batch. Ciò riflette i dettagli dell'implementazione interna e le opzioni funzionano in modo analogo a quelle di altri connettori Structured Streaming, ad eccezione del fatto che i record vengono recuperati e quindi elaborati.
Opzione | Default value | Descrizione |
---|---|---|
numFetchPartitions |
Set a metà del numero degli esecutori presenti all'inizializzazione dello stream. | Numero di attività Spark parallele che recuperano record da una sottoscrizione. |
deleteSubscriptionOnStreamStop |
false |
Se true , la sottoscrizione passata al flusso viene eliminata al termine del processo di streaming. |
maxBytesPerTrigger |
Nessuno | Un limit flessibile per l'elaborazione delle dimensioni del batch durante ogni micro batch attivato. |
maxRecordsPerFetch |
1000 | Numero di record da recuperare per ogni attività prima dell'elaborazione dei record. |
maxFetchPeriod |
10 secondi | Durata del recupero di ogni attività prima dell'elaborazione dei record. Databricks consiglia di usare il valore predefinito. |
Semantica di elaborazione batch incrementale per Pub/Sub
È possibile usare Trigger.AvailableNow
per utilizzare i record disponibili dalle origini Pub/Sub di un batch incrementale.
Azure Databricks registra il timestamp quando si inizia una lettura con l'impostazione Trigger.AvailableNow
. I record elaborati dal batch includono tutti i dati recuperati in precedenza e tutti i record appena pubblicati con un timestamp minore del timestamp di inizio del flusso registrato.
Si veda Configurazione dell'elaborazione batch incrementale.
Monitoraggio delle metriche di streaming
Le metriche di stato di Structured Streaming segnalano il numero di record recuperati e pronti per l'elaborazione, le dimensioni dei record recuperati e pronti per l'elaborazione e il numero di duplicati rilevati dall'avvio del flusso. Di seguito è riportato un esempio di queste metriche:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Limiti
L'esecuzione speculativa (spark.speculation
) non è supportata con Pub/Sub.