read_kinesis
функция потоковой передачи табличного значения
Область применения: Databricks SQL Databricks Runtime 13.3 LTS и выше
Возвращает таблицу с записями, считываемыми из Kinesis из одного или нескольких потоков.
Синтаксис
read_kinesis ( { parameter => value } [, ...] )
Аргументы
read_kinesis
требует вызова именованных параметров.
Единственным обязательным аргументом является streamName
. Все остальные аргументы являются необязательными.
Ниже приведены краткие описания аргументов. Дополнительные сведения см. в документации Amazon Kinesis .
Существуют различные параметры подключения для подключения и проверки подлинности с помощью AWS.
awsAccessKey
, и awsSecretKey
можно указать в аргументах функции с помощью секретной функции, вручную задать в аргументах или настроить в качестве переменных среды, как указано ниже.
roleArn
, roleExternalID
roleSessionName
также можно использовать для проверки подлинности в AWS с помощью профилей экземпляров.
Если ни одно из них не указано, оно будет использовать цепочку поставщиков AWS по умолчанию.
Параметр | Тип | Описание |
---|---|---|
streamName |
STRING |
Обязательный, разделенный запятыми список одного или нескольких потоков kinesis. |
awsAccessKey |
STRING |
Ключ доступа AWS, если таковой есть. Можно также указать с помощью различных параметров, поддерживаемых цепочкой поставщиков учетных данных по умолчанию AWS, включая переменные среды (AWS_ACCESS_KEY_ID ) и файл профилей учетных данных. |
awsSecretKey |
STRING |
Секретный ключ, соответствующий ключу доступа. Можно указать в аргументах или с помощью различных параметров, поддерживаемых цепочкой поставщиков учетных данных AWS по умолчанию, включая переменные среды (AWS_SECRET_KEY или AWS_SECRET_ACCESS_KEY ) и файл профилей учетных данных. |
roleArn |
STRING |
Имя ресурса Amazon для предполагаемой роли при доступе к Kinesis. |
roleExternalId |
STRING |
Используется при делегировании доступа к учетной записи AWS. |
roleSessionName |
STRING |
Имя сеанса роли AWS. |
stsEndpoint |
STRING |
Конечная точка для запроса учетных данных временного доступа. |
region |
STRING |
Регион для потоков, которые нужно указать. По умолчанию используется локально разрешенный регион. |
endpoint |
STRING |
региональная конечная точка для потоков данных Kinesis. По умолчанию используется локально разрешенный регион. |
initialPosition |
STRING |
Начальная позиция для чтения из потока. Один из следующих: "последний" (по умолчанию), "trim_horizon", "самый ранний", "at_timestamp". |
consumerMode |
STRING |
Одно из следующих: "опрос" (по умолчанию) или "EFO" (расширенный вентилятор). |
consumerName |
STRING |
Имя потребителя. Все потребители префиксируются с помощью databricks_. Значением по умолчанию является пустая строка. |
registerConsumerTimeoutInterval |
STRING |
Максимальное время ожидания ожидания для регистрации потребителя Kinesis EFO в потоке Kinesis перед вызовом ошибки. Значение по умолчанию — 300s. |
requireConsumerDeregistration |
BOOLEAN |
true для отмены регистрации потребителя EFO при завершении запроса. По умолчанию — false . |
deregisterConsumerTimeoutInterval |
STRING |
Максимальное время ожидания, чтобы потребитель Kinesis EFO был дерегистрирован с помощью потока Kinesis, прежде чем вызывать ошибку. Значение по умолчанию — 300s. |
consumerRefreshInterval |
STRING |
Интервал, с которым проверяется и обновляется потребитель. Значение по умолчанию — 300s. |
Следующие аргументы используются для управления пропускной способностью чтения и задержкой для Kinesis:
Параметр | Тип | Описание |
---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
Необязательный параметр, при использовании 10 000 записей для чтения для каждого запроса API в Kinesis. |
maxFetchRate |
STRING |
Скорость предварительной выборки данных на сегмент. Значение между "1.0" и "2.0", которое измеряется в МБ/с. Значение по умолчанию — 1.0. |
minFetchPeriod |
STRING |
Максимальное время ожидания между последовательными попытками предварительной выборки. Значение по умолчанию — 400 мс. |
maxFetchDuration |
STRING |
Максимальная длительность буферизации предварительно подготовленных новых данных. Значение по умолчанию — 10s. |
fetchBufferSize |
STRING |
Объем данных для следующего триггера. Значение по умолчанию — 20 гб. |
shardsPerTask |
INTEGER (>0) |
Количество сегментов Kinesis для предварительного получения от параллельной задачи Spark. Значение по умолчанию равно 5. |
shardFetchinterval |
STRING |
Как часто опрашивать для изменения сегментирования. Значение по умолчанию — "1s". |
coalesceThresholdBlockSize |
INTEGER (>0) |
Пороговое значение, при котором происходит автоматическое объединение. Значение по умолчанию — 10 000 000. |
coalesce |
BOOLEAN |
true для объединения предварительно подготовленных запросов. Значение по умолчанию — true . |
coalesceBinSize |
INTEGER (>0) |
Приблизительный размер блока после объединения. Значение по умолчанию — 128 000 000. |
reuseKinesisClient |
BOOLEAN |
true для повторного использования клиента Kinesis, хранящегося в кэше. Значение по умолчанию за true исключением кластера PE. |
clientRetries |
INTEGER (>0) |
Количество повторных попыток в сценарии повторных попыток. Значение по умолчанию равно 5. |
Возвраты
Таблица записей Kinesis со следующей схемой:
Имя. | Тип данных | Допускает значение NULL | Стандартные | Description |
---|---|---|---|---|
partitionKey |
STRING |
No | Ключ, используемый для распределения данных между сегментами потока. Все записи данных с одним ключом секции будут считываться из одного сегмента. | |
data |
BINARY |
No | Полезные данные kinesis, закодированные в кодировке Base-64. | |
stream |
STRING |
No | Имя потока, из которого считывались данные. | |
shardId |
STRING |
No | Уникальный идентификатор сегмента, из которого были считываются данные. | |
sequenceNumber |
BIGINT |
No | Уникальный идентификатор записи в его сегменте. | |
approximateArrivalTimestamp |
TIMESTAMP |
No | Приблизительное время вставки записи в поток. |
Столбцы (stream, shardId, sequenceNumber)
представляют собой первичный ключ.
Примеры
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');