read_pubsub
Função com valor de tabela de streaming
Aplica-se a: Databricks SQL Databricks Runtime 13.3 LTS e superior
Retorna uma tabela com registros lidos de Pub/Sub de um tópico. Suporta apenas consultas de streaming.
Sintaxe
read_pubsub( { parameter => value } [, ...])
Argumentos
read_pubsub
requer invocação de parâmetro nomeado.
Os únicos argumentos necessários são subscriptionId
, projectId
e topicId
. Todos os outros argumentos são opcionais.
Para obter descrições completas dos argumentos, consulte Configurar opções para leitura de streaming Pub/Sub.
A Databricks recomenda o uso de segredos ao fornecer opções de autorização. Ver função secreta.
Para obter detalhes sobre como configurar o acesso a Pub/Sub, consulte Configurar o acesso a Pub/Sub.
Parâmetro | Tipo | Description |
---|---|---|
subscriptionId |
STRING |
Obrigatório, o identificador exclusivo atribuído a uma assinatura Pub/Sub. |
projectId |
STRING |
Obrigatório, o ID do projeto do Google Cloud associado ao tópico Pub/Sub. |
topicId |
STRING |
Obrigatório, o ID ou o nome do tópico Pub/Sub para se inscrever. |
clientEmail |
STRING |
O endereço de e-mail associado a uma conta de serviço para autenticação. |
clientId |
STRING |
O ID do cliente associado à conta de serviço para autenticação. |
privateKeyId |
STRING |
A ID da chave privada associada à conta de serviço. |
privateKey |
STRING |
A chave privada associada à conta de serviço para autenticação. |
Estes argumentos são usados para ajustes mais finos ao ler a partir de Pub/Sub:
Parâmetro | Tipo | Description |
---|---|---|
numFetchPartitions |
STRING |
Opcional com número padrão de executores. O número de tarefas paralelas do Spark que buscam registros de uma assinatura. |
deleteSubscriptionOnStreamStop |
BOOLEAN |
Opcional com padrão false . Se definido como true, a assinatura passada para o fluxo será excluída quando o trabalho de streaming terminar. |
maxBytesPerTrigger |
STRING |
Um limite suave para o tamanho do lote a ser processado durante cada microlote acionado. O padrão é 'nenhum'. |
maxRecordsPerFetch |
STRING |
O número de registros a serem buscados por tarefa antes de processar registros. O padrão é '1000'. |
maxFetchPeriod |
STRING |
A duração do tempo para cada tarefa a ser buscada antes de processar registros. O padrão é '10s'. |
Devoluções
Uma tabela de registros Pub/Sub com o esquema a seguir. A coluna de atributos pode ser nula, mas todas as outras colunas não são nulas.
Name | Tipo de dados | Pode ser nulo | Standard | Description |
---|---|---|---|---|
messageId |
STRING |
Não | Identificador exclusivo da mensagem Pub/Sub. | |
payload |
BINARY |
Não | O conteúdo da mensagem Pub/Sub. | |
attributes |
STRING |
Sim | Pares chave-valor que representam os atributos da mensagem Pub/Sub. Esta é uma cadeia de caracteres codificada em json. | |
publishTimestampInMillis |
BIGINT |
Não | O carimbo de data/hora quando a mensagem foi publicada, em milissegundos. | |
sequenceNumber |
BIGINT |
Não | O identificador exclusivo do registro em seu fragmento. |
Exemplos
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
Os dados teriam agora de ser consultados a testing.streaming_table
partir do para uma análise mais aprofundada.
Consultas erradas:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);