Inscrever-se no Google Pub/Sub
O Azure Databricks fornece um conector interno para assinar o Google Pub/Sub no Databricks Runtime 13.3 LTS e superior. Este conector fornece semântica de processamento exatamente uma vez para registros do assinante.
Nota
Pub/Sub pode publicar registros duplicados e os registros podem chegar ao assinante fora de ordem. Você deve escrever o código do Azure Databricks para lidar com registros duplicados e fora de ordem.
Exemplo de sintaxe
O exemplo de código a seguir demonstra a sintaxe básica para configurar um Structured Streaming lido de Pub/Sub:
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
Para obter mais opções de configuração, consulte Configurar opções para leitura de streaming Pub/Sub.
Configurar o acesso a Pub/Sub
A Databricks recomenda o uso de segredos ao fornecer opções de autorização. As seguintes opções são necessárias para autorizar uma conexão:
clientEmail
clientId
privateKey
privateKeyId
A tabela a seguir descreve as funções necessárias para as credenciais configuradas:
Funções | Obrigatório ou opcional | Como é utilizado |
---|---|---|
roles/pubsub.viewer ou roles/viewer |
Necessário | Verifique se existe uma subscrição e obtenha uma subscrição |
roles/pubsub.subscriber |
Necessário | Buscar dados de uma assinatura |
roles/pubsub.editor ou roles/editor |
Opcional | Permite a criação de uma assinatura, caso não exista, e também permite o deleteSubscriptionOnStreamStop uso da para excluir assinaturas na terminação de fluxo |
Esquema Pub/Sub
O esquema para o fluxo corresponde aos registros que são buscados de Pub/Sub, conforme descrito na tabela a seguir:
Campo | Type |
---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Configurar opções para leitura de streaming Pub/Sub
A tabela a seguir descreve as opções suportadas para Pub/Sub. Todas as opções são configuradas como parte de uma leitura de Streaming Estruturado usando .option("<optionName>", "<optionValue>")
sintaxe.
Nota
Algumas opções de configuração Pub/Sub usam o conceito de buscas em vez de microlotes. Isso reflete os detalhes internos da implementação, e as opções funcionam de forma semelhante aos corolários em outros conectores de Streaming Estruturado, exceto que os registros são buscados e, em seguida, processados.
Opção | Valor predefinido | Description |
---|---|---|
numFetchPartitions |
Defina como metade do número de executores presentes na inicialização do fluxo. | O número de tarefas paralelas do Spark que buscam registros de uma assinatura. |
deleteSubscriptionOnStreamStop |
false |
Se true o , a assinatura passada para o fluxo for excluída quando o trabalho de streaming terminar. |
maxBytesPerTrigger |
nenhum | Um limite suave para o tamanho do lote a ser processado durante cada microlote acionado. |
maxRecordsPerFetch |
1000 | O número de registros a serem buscados por tarefa antes de processar registros. |
maxFetchPeriod |
10 segundos | A duração do tempo para cada tarefa a ser buscada antes de processar registros. O Databricks recomenda o uso do valor padrão. |
Semântica incremental de processamento em lote para Pub/Sub
Você pode usar Trigger.AvailableNow
para consumir registros disponíveis das fontes Pub/Sub um lote incremental.
O Azure Databricks registra o carimbo de data/hora quando você inicia uma leitura com a Trigger.AvailableNow
configuração. Os registros processados pelo lote incluem todos os dados obtidos anteriormente e quaisquer registros recém-publicados com um carimbo de data/hora menor do que o carimbo de data/hora de início do fluxo gravado.
Consulte Configurando o processamento incremental em lote.
Monitoramento de métricas de streaming
As métricas de progresso do Streaming estruturado relatam o número de registros buscados e prontos para processar, o tamanho dos registros buscados e prontos para processar e o número de duplicatas vistas desde o início do fluxo. Segue-se um exemplo destas métricas:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Limitações
A execução especulativa (spark.speculation
) não é suportada com Pub/Sub.