Поделиться через


read_kinesis функция потоковой передачи табличного значения

Область применения: флажок Databricks SQL флажок Databricks Runtime 13.3 LTS и выше

Возвращает таблицу с записями, считываемыми из Kinesis из одного или нескольких потоков.

Синтаксис

read_kinesis ( { parameter => value } [, ...] )

Аргументы

read_kinesis требует вызова именованных параметров.

Единственным обязательным аргументом является streamName. Все остальные аргументы являются необязательными.

Ниже приведены краткие описания аргументов. Дополнительные сведения см. в документации Amazon Kinesis .

Существуют различные параметры подключения для подключения и проверки подлинности с помощью AWS. awsAccessKey, и awsSecretKey можно указать в аргументах функции с помощью секретной функции, вручную задать в аргументах или настроить в качестве переменных среды, как указано ниже. roleArn, roleExternalIDroleSessionName также можно использовать для проверки подлинности в AWS с помощью профилей экземпляров. Если ни одно из них не указано, оно будет использовать цепочку поставщиков AWS по умолчанию.

Параметр Тип Описание
streamName STRING Обязательный, разделенный запятыми список одного или нескольких потоков kinesis.
awsAccessKey STRING Ключ доступа AWS, если таковой есть. Можно также указать с помощью различных параметров, поддерживаемых цепочкой поставщиков учетных данных по умолчанию AWS, включая переменные среды (AWS_ACCESS_KEY_ID) и файл профилей учетных данных.
awsSecretKey STRING Секретный ключ, соответствующий ключу доступа. Можно указать в аргументах или с помощью различных параметров, поддерживаемых цепочкой поставщиков учетных данных AWS по умолчанию, включая переменные среды (AWS_SECRET_KEY или AWS_SECRET_ACCESS_KEY) и файл профилей учетных данных.
roleArn STRING Имя ресурса Amazon для предполагаемой роли при доступе к Kinesis.
roleExternalId STRING Используется при делегировании доступа к учетной записи AWS.
roleSessionName STRING Имя сеанса роли AWS.
stsEndpoint STRING Конечная точка для запроса учетных данных временного доступа.
region STRING Регион для потоков, которые нужно указать. По умолчанию используется локально разрешенный регион.
endpoint STRING региональная конечная точка для потоков данных Kinesis. По умолчанию используется локально разрешенный регион.
initialPosition STRING Начальная позиция для чтения из потока. Один из следующих: "последний" (по умолчанию), "trim_horizon", "самый ранний", "at_timestamp".
consumerMode STRING Одно из следующих: "опрос" (по умолчанию) или "EFO" (расширенный вентилятор).
consumerName STRING Имя потребителя. Все потребители префиксируются с помощью databricks_. Значением по умолчанию является пустая строка.
registerConsumerTimeoutInterval STRING Максимальное время ожидания ожидания для регистрации потребителя Kinesis EFO в потоке Kinesis перед вызовом ошибки. Значение по умолчанию — 300s.
requireConsumerDeregistration BOOLEAN true для отмены регистрации потребителя EFO при завершении запроса. По умолчанию — false.
deregisterConsumerTimeoutInterval STRING Максимальное время ожидания, чтобы потребитель Kinesis EFO был дерегистрирован с помощью потока Kinesis, прежде чем вызывать ошибку. Значение по умолчанию — 300s.
consumerRefreshInterval STRING Интервал, с которым проверяется и обновляется потребитель. Значение по умолчанию — 300s.

Следующие аргументы используются для управления пропускной способностью чтения и задержкой для Kinesis:

Параметр Тип Описание
maxRecordsPerFetch INTEGER (>0) Необязательный параметр, при использовании 10 000 записей для чтения для каждого запроса API в Kinesis.
maxFetchRate STRING Скорость предварительной выборки данных на сегмент. Значение между "1.0" и "2.0", которое измеряется в МБ/с. Значение по умолчанию — 1.0.
minFetchPeriod STRING Максимальное время ожидания между последовательными попытками предварительной выборки. Значение по умолчанию — 400 мс.
maxFetchDuration STRING Максимальная длительность буферизации предварительно подготовленных новых данных. Значение по умолчанию — 10s.
fetchBufferSize STRING Объем данных для следующего триггера. Значение по умолчанию — 20 гб.
shardsPerTask INTEGER (>0) Количество сегментов Kinesis для предварительного получения от параллельной задачи Spark. Значение по умолчанию равно 5.
shardFetchinterval STRING Как часто опрашивать для изменения сегментирования. Значение по умолчанию — "1s".
coalesceThresholdBlockSize INTEGER (>0) Пороговое значение, при котором происходит автоматическое объединение. Значение по умолчанию — 10 000 000.
coalesce BOOLEAN true для объединения предварительно подготовленных запросов. Значение по умолчанию — true.
coalesceBinSize INTEGER (>0) Приблизительный размер блока после объединения. Значение по умолчанию — 128 000 000.
reuseKinesisClient BOOLEAN true для повторного использования клиента Kinesis, хранящегося в кэше. Значение по умолчанию за true исключением кластера PE.
clientRetries INTEGER (>0) Количество повторных попыток в сценарии повторных попыток. Значение по умолчанию равно 5.

Возвраты

Таблица записей Kinesis со следующей схемой:

Имя. Тип данных Допускает значение NULL Стандартные Description
partitionKey STRING No Ключ, используемый для распределения данных между сегментами потока. Все записи данных с одним ключом секции будут считываться из одного сегмента.
data BINARY No Полезные данные kinesis, закодированные в кодировке Base-64.
stream STRING No Имя потока, из которого считывались данные.
shardId STRING No Уникальный идентификатор сегмента, из которого были считываются данные.
sequenceNumber BIGINT No Уникальный идентификатор записи в его сегменте.
approximateArrivalTimestamp TIMESTAMP No Приблизительное время вставки записи в поток.

Столбцы (stream, shardId, sequenceNumber) представляют собой первичный ключ.

Примеры

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
        awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');