read_kinesis
流式表值函数
适用于: Databricks SQL Databricks Runtime 13.3 LTS 及更高版本
返回一个表,其中包含从一个或多个流的 Kinesis 中读取的记录。
语法
read_kinesis ( { parameter => value } [, ...] )
参数
read_kinesis
需要命名参数调用。
唯一必需的参数为 streamName
。 所有其他参数都是可选的。
此处对参数的说明很简短。 有关详细信息,请参阅 Amazon Kinesis 文档。
可通过多个连接选项来连接 AWS 并对其进行身份验证。
可以使用 secret 函数在函数参数中指定 awsAccessKey
和 awsSecretKey
,也可以在参数中手动设置,或按如下所示配置为环境变量。
roleArn
、roleExternalID
、roleSessionName
也可通过使用实例配置文件用于对 AWS 进行身份验证。
如果未指定这些项,它将使用默认的 AWS 提供程序链。
参数 | 类型 | 描述 |
---|---|---|
streamName |
STRING |
一个或多个 Kinesis 流的必需逗号分隔列表。 |
awsAccessKey |
STRING |
AWS 访问密钥(如果有)。 还可以通过 AWS 默认凭据提供程序链支持的各种选项进行指定,包括环境变量(AWS_ACCESS_KEY_ID )和凭据配置文件。 |
awsSecretKey |
STRING |
对应于访问密钥的密钥。 可以在参数中指定,也可以通过 AWS 默认凭据提供程序链支持的各种选项指定,包括环境变量(AWS_SECRET_KEY 或 AWS_SECRET_ACCESS_KEY )以及凭据配置文件。 |
roleArn |
STRING |
访问 Kinesis 时要承担的角色的 Amazon 资源名称。 |
roleExternalId |
STRING |
在委派对 AWS 帐户的访问权限时使用。 |
roleSessionName |
STRING |
AWS 角色会话名称。 |
stsEndpoint |
STRING |
用于请求临时访问凭据的终结点。 |
region |
STRING |
要指定的流的区域。 默认值为本地解析区域。 |
endpoint |
STRING |
Kinesis 数据流的区域终结点。 默认值为本地解析区域。 |
initialPosition |
STRING |
从流中读取的起始位置。 以下之一:“latest”(默认)、“trim_horizon”、“earliest”、“at_timestamp”。 |
consumerMode |
STRING |
以下之一:“polling”(默认),或“EFO”(增强型扇出)。 |
consumerName |
STRING |
使用者的名称。 所有使用者以“databricks_”为前缀。 默认值为空字符串。 |
registerConsumerTimeoutInterval |
STRING |
在引发错误之前等待 Kinesis EFO 使用者注册到 Kinesis 流的最大超时。 默认值为“300s”。 |
requireConsumerDeregistration |
BOOLEAN |
为 true 则在查询终止时注册 EFO 使用者注册。 默认值为 false 。 |
deregisterConsumerTimeoutInterval |
STRING |
在引发错误之前等待 Kinesis EFO 使用者在 Kinesis 流中取消注册的最大超时。 默认值为“300s”。 |
consumerRefreshInterval |
STRING |
检查和刷新使用者的间隔。 默认值为“300s”。 |
以下参数用于控制 Kinesis 的读取吞吐量和延迟:
参数 | 类型 | 描述 |
---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
可选,默认值为 10,000,按 API 对 Kinesis 的请求读取的记录。 |
maxFetchRate |
STRING |
每个分片预提取数据的速度。 介于“1.0”和“2.0”之间的值,以 MB/秒为单位。 默认值为“1.0”。 |
minFetchPeriod |
STRING |
连续预提取尝试之间的最长等待时间。 默认值为“400ms”。 |
maxFetchDuration |
STRING |
缓冲预提取的新数据的最长持续时间。 默认值为“10s”。 |
fetchBufferSize |
STRING |
下一个触发器的数据量。 默认值为“20gb”。 |
shardsPerTask |
INTEGER (>0) |
每个 Spark 任务要并行预提取的 Kinesis 分片数。 默认值为 5。 |
shardFetchinterval |
STRING |
轮询以重新分片的频率。 默认值为“1s”。 |
coalesceThresholdBlockSize |
INTEGER (>0) |
自动合并发生的阈值。 默认值为 10,000,000。 |
coalesce |
BOOLEAN |
为 true 则联合预提取的请求。 默认为 true 。 |
coalesceBinSize |
INTEGER (>0) |
合并后的近似块大小。 默认值为 128,000,000。 |
reuseKinesisClient |
BOOLEAN |
为 true 则重用缓存中存储的 Kinesis 客户端。 默认值为 true (PE 群集上除外)。 |
clientRetries |
INTEGER (>0) |
重试方案中的重试次数。 默认值为 5。 |
返回
具有以下架构的 Kinesis 记录表:
名称 | 数据类型 | Nullable | 标准 | 描述 |
---|---|---|---|---|
partitionKey |
STRING |
否 | 用于在流的分片之间分发数据的键。 具有相同分区键的所有数据记录将从同一分片读取。 | |
data |
BINARY |
否 | kinesis 数据有效负载,base-64 编码。 | |
stream |
STRING |
否 | 从中读取数据的流的名称。 | |
shardId |
STRING |
否 | 从中读取数据的分片的唯一标识符。 | |
sequenceNumber |
BIGINT |
否 | 记录分片中该记录的唯一标识符。 | |
approximateArrivalTimestamp |
TIMESTAMP |
否 | 将记录插入流中的大致时间。 |
(stream, shardId, sequenceNumber)
列构成主键。
示例
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');