Partilhar via


read_pubsub Função com valor de tabela de streaming

Aplica-se a: Marque Sim Databricks SQL Marque Sim Databricks Runtime 13.3 LTS e superior

Retorna uma tabela com registros lidos de Pub/Sub de um tópico. Suporta apenas consultas de streaming.

Sintaxe

read_pubsub( { parameter => value } [, ...])

Argumentos

read_pubsub requer invocação de parâmetro nomeado.

Os únicos argumentos necessários são subscriptionId, projectIde topicId. Todos os outros argumentos são opcionais.

Para obter descrições completas dos argumentos, consulte Configurar opções para leitura de streaming Pub/Sub.

A Databricks recomenda o uso de segredos ao fornecer opções de autorização. Ver função secreta.

Para obter detalhes sobre como configurar o acesso a Pub/Sub, consulte Configurar o acesso a Pub/Sub.

Parâmetro Tipo Description
subscriptionId STRING Obrigatório, o identificador exclusivo atribuído a uma assinatura Pub/Sub.
projectId STRING Obrigatório, o ID do projeto do Google Cloud associado ao tópico Pub/Sub.
topicId STRING Obrigatório, o ID ou o nome do tópico Pub/Sub para se inscrever.
clientEmail STRING O endereço de e-mail associado a uma conta de serviço para autenticação.
clientId STRING O ID do cliente associado à conta de serviço para autenticação.
privateKeyId STRING A ID da chave privada associada à conta de serviço.
privateKey STRING A chave privada associada à conta de serviço para autenticação.

Estes argumentos são usados para ajustes mais finos ao ler a partir de Pub/Sub:

Parâmetro Tipo Description
numFetchPartitions STRING Opcional com número padrão de executores. O número de tarefas paralelas do Spark que buscam registros de uma assinatura.
deleteSubscriptionOnStreamStop BOOLEAN Opcional com padrão false. Se definido como true, a assinatura passada para o fluxo será excluída quando o trabalho de streaming terminar.
maxBytesPerTrigger STRING Um limite suave para o tamanho do lote a ser processado durante cada microlote acionado. O padrão é 'nenhum'.
maxRecordsPerFetch STRING O número de registros a serem buscados por tarefa antes de processar registros. O padrão é '1000'.
maxFetchPeriod STRING A duração do tempo para cada tarefa a ser buscada antes de processar registros. O padrão é '10s'.

Devoluções

Uma tabela de registros Pub/Sub com o esquema a seguir. A coluna de atributos pode ser nula, mas todas as outras colunas não são nulas.

Name Tipo de dados Pode ser nulo Standard Description
messageId STRING Não Identificador exclusivo da mensagem Pub/Sub.
payload BINARY Não O conteúdo da mensagem Pub/Sub.
attributes STRING Sim Pares chave-valor que representam os atributos da mensagem Pub/Sub. Esta é uma cadeia de caracteres codificada em json.
publishTimestampInMillis BIGINT Não O carimbo de data/hora quando a mensagem foi publicada, em milissegundos.
sequenceNumber BIGINT Não O identificador exclusivo do registro em seu fragmento.

Exemplos

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

Os dados teriam agora de ser consultados a testing.streaming_table partir do para uma análise mais aprofundada.

Consultas erradas:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);