Поделиться через


read_pubsub функция потоковой передачи табличного значения

Область применения:флажок Databricks SQL флажок Databricks Runtime 13.3 LTS и выше

Возвращает таблицу с записями, считываемыми из pub/Sub из раздела. Поддерживает только потоковые запросы.

Синтаксис

read_pubsub( { parameter => value } [, ...])

Аргументы

read_pubsub требует вызова именованных параметров.

Единственными обязательными аргументами являются subscriptionId, projectIdи topicId. Все остальные аргументы являются необязательными.

Полные описания аргументов см. в разделе "Настройка параметров потоковой передачи pub/sub".

Databricks рекомендует использовать секреты при предоставлении параметров авторизации. См . секретную функцию.

Дополнительные сведения о настройке доступа к Pub/Sub см. в разделе "Настройка доступа к Pub/Sub".

Параметр Тип Описание
subscriptionId STRING Обязательный уникальный идентификатор, назначенный подписке Pub/Sub.
projectId STRING Обязательный идентификатор проекта Google Cloud, связанный с разделом Pub/Sub.
topicId STRING Обязательный идентификатор или имя раздела Pub/Sub для подписки.
clientEmail STRING Адрес электронной почты, связанный с учетной записью службы для проверки подлинности.
clientId STRING Идентификатор клиента, связанный с учетной записью службы для проверки подлинности.
privateKeyId STRING Идентификатор закрытого ключа, связанного с учетной записью службы.
privateKey STRING Закрытый ключ, связанный с учетной записью службы для проверки подлинности.

Эти аргументы используются для дальнейшей точной настройки при чтении из Pub/Sub:

Параметр Тип Описание
numFetchPartitions STRING Необязательный параметр с числом исполнителей по умолчанию. Количество параллельных задач Spark, которые извлекает записи из подписки.
deleteSubscriptionOnStreamStop BOOLEAN Необязательный параметр по умолчанию false. Если задано значение true, подписка, переданная в поток, удаляется при завершении задания потоковой передачи.
maxBytesPerTrigger STRING Обратимое ограничение для обработки размера пакета во время каждого запускаемого микропакета. Значение по умолчанию — none.
maxRecordsPerFetch STRING Количество записей для получения каждой задачи перед обработкой записей. Значение по умолчанию — 1000.
maxFetchPeriod STRING Длительность времени для каждой задачи, извлекаемой перед обработкой записей. Значение по умолчанию — 10s.

Возвраты

Таблица записей Pub/Sub со следующей схемой. Столбец атрибутов может иметь значение NULL, но все остальные столбцы не имеют значения NULL.

Имя. Тип данных Допускает значение NULL Стандартные Описание
messageId STRING No Уникальный идентификатор сообщения Pub/Sub.
payload BINARY No Содержимое сообщения Pub/Sub.
attributes STRING Да Пары "ключ-значение", представляющие атрибуты сообщения Pub/Sub. Это строка, закодированная в формате JSON.
publishTimestampInMillis BIGINT No Метка времени публикации сообщения в миллисекундах.
sequenceNumber BIGINT No Уникальный идентификатор записи в его сегменте.

Примеры

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

Теперь данные должны запрашиваться из дальнейшего testing.streaming_table анализа.

Ошибочные запросы:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);