read_pubsub
функция потоковой передачи табличного значения
Область применения: Databricks SQL Databricks Runtime 13.3 LTS и выше
Возвращает таблицу с записями, считываемыми из pub/Sub из раздела. Поддерживает только потоковые запросы.
Синтаксис
read_pubsub( { parameter => value } [, ...])
Аргументы
read_pubsub
требует вызова именованных параметров.
Единственными обязательными аргументами являются subscriptionId
, projectId
и topicId
. Все остальные аргументы являются необязательными.
Полные описания аргументов см. в разделе "Настройка параметров потоковой передачи pub/sub".
Databricks рекомендует использовать секреты при предоставлении параметров авторизации. См . секретную функцию.
Дополнительные сведения о настройке доступа к Pub/Sub см. в разделе "Настройка доступа к Pub/Sub".
Параметр | Тип | Описание |
---|---|---|
subscriptionId |
STRING |
Обязательный уникальный идентификатор, назначенный подписке Pub/Sub. |
projectId |
STRING |
Обязательный идентификатор проекта Google Cloud, связанный с разделом Pub/Sub. |
topicId |
STRING |
Обязательный идентификатор или имя раздела Pub/Sub для подписки. |
clientEmail |
STRING |
Адрес электронной почты, связанный с учетной записью службы для проверки подлинности. |
clientId |
STRING |
Идентификатор клиента, связанный с учетной записью службы для проверки подлинности. |
privateKeyId |
STRING |
Идентификатор закрытого ключа, связанного с учетной записью службы. |
privateKey |
STRING |
Закрытый ключ, связанный с учетной записью службы для проверки подлинности. |
Эти аргументы используются для дальнейшей точной настройки при чтении из Pub/Sub:
Параметр | Тип | Описание |
---|---|---|
numFetchPartitions |
STRING |
Необязательный параметр с числом исполнителей по умолчанию. Количество параллельных задач Spark, которые извлекает записи из подписки. |
deleteSubscriptionOnStreamStop |
BOOLEAN |
Необязательный параметр по умолчанию false . Если задано значение true, подписка, переданная в поток, удаляется при завершении задания потоковой передачи. |
maxBytesPerTrigger |
STRING |
Обратимое ограничение для обработки размера пакета во время каждого запускаемого микропакета. Значение по умолчанию — none. |
maxRecordsPerFetch |
STRING |
Количество записей для получения каждой задачи перед обработкой записей. Значение по умолчанию — 1000. |
maxFetchPeriod |
STRING |
Длительность времени для каждой задачи, извлекаемой перед обработкой записей. Значение по умолчанию — 10s. |
Возвраты
Таблица записей Pub/Sub со следующей схемой. Столбец атрибутов может иметь значение NULL, но все остальные столбцы не имеют значения NULL.
Имя. | Тип данных | Допускает значение NULL | Стандартные | Описание |
---|---|---|---|---|
messageId |
STRING |
No | Уникальный идентификатор сообщения Pub/Sub. | |
payload |
BINARY |
No | Содержимое сообщения Pub/Sub. | |
attributes |
STRING |
Да | Пары "ключ-значение", представляющие атрибуты сообщения Pub/Sub. Это строка, закодированная в формате JSON. | |
publishTimestampInMillis |
BIGINT |
No | Метка времени публикации сообщения в миллисекундах. | |
sequenceNumber |
BIGINT |
No | Уникальный идентификатор записи в его сегменте. |
Примеры
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
Теперь данные должны запрашиваться из дальнейшего testing.streaming_table
анализа.
Ошибочные запросы:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);