Compartir a través de


Función con valores de tabla de transmisión por secuencias read_pubsub

Se aplica a: casilla marcada como sí Databricks SQL casilla marcada como Sí Databricks Runtime 13.3 LTS y versiones posteriores

Devuelve una tabla con registros leídos de Pub/Sub de un tema. Solo admite consultas de streaming.

Sintaxis

read_pubsub( { parameter => value } [, ...])

Argumentos

read_pubsub requiere una invocación de parámetros con nombre.

Los únicos argumentos necesarios son subscriptionId, projectId, y topicId. Todos los demás argumentos son opcionales.

Para obtener descripciones de argumentos completas, vea Opciones de configuración para la lectura de secuencias pub/sub.

Databricks recomienda usar secretos al proporcionar opciones de autorización. Vea función secreta.

Para obtener más información sobre cómo configurar el acceso a Pub/Sub, vea Configurar el acceso a Pub/Sub.

Parámetro Tipo Descripción
subscriptionId STRING Obligatorio, el identificador único asignado a una suscripción de Pub/Sub.
projectId STRING Obligatorio, el identificador de proyecto de Google Cloud asociado al tema Pub/Sub.
topicId STRING Obligatorio, el identificador o el nombre del tema Pub/Sub al que suscribirse.
clientEmail STRING Dirección de correo electrónico asociada a una cuenta de servicio para la autenticación.
clientId STRING Identificador de cliente asociado a la cuenta de servicio para la autenticación.
privateKeyId STRING Identificador de la clave privada asociada a la cuenta de servicio.
privateKey STRING Clave privada asociada a la cuenta de servicio para la autenticación.

Estos argumentos se usan para un ajuste más preciso al leer desde Pub/Sub:

Parámetro Tipo Descripción
numFetchPartitions STRING Opcional con el número predeterminado de ejecutores. Número de tareas paralelas de Spark que capturan registros de una suscripción.
deleteSubscriptionOnStreamStop BOOLEAN Opcional con predeterminadfalse. Si se establece en true, la suscripción pasada a la secuencia se elimina cuando finaliza el trabajo de streaming.
maxBytesPerTrigger STRING Límite flexible para el tamaño del lote que se va a procesar durante cada microlote desencadenado. El valor predeterminado no es ‘ninguno’.
maxRecordsPerFetch STRING Número de registros que se van a capturar por tarea antes de procesar los registros. El valor predeterminado es ‘1000’.
maxFetchPeriod STRING Duración de tiempo para cada tarea que se va a capturar antes de procesar los registros. El valor predeterminado es "10 s".

Devoluciones

Tabla de registros Pub/Sub con el esquema siguiente. La columna de atributos podría ser null, pero todas las demás columnas no son null.

Nombre Tipo de datos Nullable Estándar Descripción
messageId STRING No Identificador único del mensaje Pub/Sub.
payload BINARY No Contenido del mensaje Pub/Sub.
attributes STRING Pares clave-valor que representan los atributos del mensaje Pub/Sub. Se trata de una cadena codificada en json.
publishTimestampInMillis BIGINT No Marca de tiempo cuando se publicó el mensaje, en milisegundos.
sequenceNumber BIGINT No Identificador único del registro dentro de su partición.

Ejemplos

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

Los datos ahora deben consultarse desde el testing.streaming_table para realizar un análisis posterior.

Consultas erróneas:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);