Compartilhar via


função de streaming com valor de tabela read_pubsub

Aplica-se a:verificação marcada como sim SQL do Databricks verificação marcada como sim Databricks Runtime 13.3 LTS e versões posteriores

Retorna uma tabela com registros lidos do Pub/Sub de um tópico. Só é compatível com consultas de streaming.

Sintaxe

read_pubsub( { parameter => value } [, ...])

Argumentos

read_pubsub requer invocação de parâmetro nomeado.

Os únicos argumentos necessários são subscriptionId, projectId e topicId. Todos os outros argumentos são opcionais.

Para descrições completas dos argumentos, veja Configurar opções para de leitura de streaming do Pub/Sub.

O Databricks recomenda o uso de segredos ao fornecer opções de autorização. Confira função secreta.

Para mais detalhes sobre a configuração do acesso ao Pub/Sub, confira Configurar o acesso ao Pub/Sub.

Parâmetro Tipo Descrição
subscriptionId STRING Obrigatório, o identificador exclusivo atribuído a uma assinatura do Pub/Sub.
projectId STRING Obrigatório, a ID do projeto do Google Cloud associada ao tópico do Pub/Sub.
topicId STRING Obrigatório, a ID ou o nome do tópico do Pub/Sub para se inscrever.
clientEmail STRING O endereço de email associado a uma conta de serviço para autenticação.
clientId STRING A ID do cliente associada à conta de serviço para autenticação.
privateKeyId STRING A ID da chave privada associada à conta de serviço.
privateKey STRING A chave privada associada à conta de serviço para autenticação.

Esses argumentos são usados para ajustes adicionais ao ler do Pub/Sub:

Parâmetro Tipo Descrição
numFetchPartitions STRING Opcional com número padrão de executores. O número de tarefas paralelas do Spark que buscam registros de uma assinatura.
deleteSubscriptionOnStreamStop BOOLEAN Opcional com padrão false. Se definido como verdadeiro, a assinatura passada para o stream será excluída quando o trabalho de streaming terminar.
maxBytesPerTrigger STRING Um limite flexível para o tamanho do lote a ser processado durante cada microlote disparado. O padrão é "none".
maxRecordsPerFetch STRING O número de registros a serem buscados por tarefa antes do processamento de registros. O padrão é "1000".
maxFetchPeriod STRING A duração do tempo para cada tarefa a ser buscada antes do processamento de registros. O padrão é ’’10s’’.

Retornos

Uma tabela de registros do Pub/Sub com o seguinte esquema. A coluna de atributos pode ser nula, mas todas as outras colunas não são nulas.

Nome Tipo de dados Nullable Standard Descrição
messageId STRING Não Identificador único para a mensagem do Pub/Sub.
payload BINARY Não O conteúdo da mensagem do Pub/Sub.
attributes STRING Sim Pares chave-valor representando os atributos da mensagem do Pub/Sub. Esta é uma string codificada em json.
publishTimestampInMillis BIGINT Não O carimbo de data/hora de quando a mensagem foi publicada, em milissegundos.
sequenceNumber BIGINT Não O identificador exclusivo do registro dentro de seu fragmento.

Exemplos

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

Agora, os dados precisariam ser consultados da testing.streaming_table para análises adicionais.

Consultas errôneas:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);