read_kinesis streaming table-valued function

Applies to: check marked yes Databricks SQL check marked yes Databricks Runtime 13.3 LTS and above

Returns a table with records read from Kinesis from one or more streams.

Syntax

read_kinesis ( { parameter => value } [, ...] )

Arguments

read_kinesis requires named parameter invocation.

The only required argument is streamName. All other arguments are optional.

The descriptions of the arguments are brief here. For more details, see the Amazon Kinesis documentation.

There are various connection options to connect and authenticate with AWS. awsAccessKey, and awsSecretKey can either be specified in the function arguments using the secret function, manually set in the arguments, or configured as environment variables as indicated below. roleArn, roleExternalID, roleSessionName can also be used to authenticate with AWS by using instance profiles. If none of these are specified, it will use the default AWS provider chain.

Parameter Type Description
streamName STRING Required, comma-separated list of one or more kinesis streams.
awsAccessKey STRING The AWS Access key, if any. Can also be specified through the various options supported through the AWS default credential provider chain including environment variables (AWS_ACCESS_KEY_ID) and a credential profiles file.
awsSecretKey STRING The secret key which corresponds to the access key. Can be specified either in the arguments or through the various options supported through the AWS default credential provider chain including environment variables (AWS_SECRET_KEY or AWS_SECRET_ACCESS_KEY) and a credentials profiles file.
roleArn STRING Amazon resource name of the role to assume when accessing Kinesis.
roleExternalId STRING Used when delegating access to the AWS account.
roleSessionName STRING AWS role session name.
stsEndpoint STRING An endpoint for requesting temporary access credentials.
region STRING Region for the streams to be specified. The default is the locally resolved region.
endpoint STRING regional endpoint for Kinesis data streams. The default is the locally resolved region.
initialPosition STRING Starting position for reading from in the stream. One of: ‘latest’ (default), ‘trim_horizon’, ‘earliest’, ‘at_timestamp’.
consumerMode STRING One of: ‘polling’ (default), or ‘EFO’ (enhanced-fan-out).
consumerName STRING The name of the consumer. All consumers are prefixed with ‘databricks_’. The default is an empty string.
registerConsumerTimeoutInterval STRING the max timeout to wait for the Kinesis EFO consumer to be registered with the Kinesis stream before throwing an error. The default is ‘300s’.
requireConsumerDeregistration BOOLEAN true to de-register the EFO consumer on query termination. Default is false.
deregisterConsumerTimeoutInterval STRING The max timeout to wait for the Kinesis EFO consumer to be deregistered with the Kinesis stream before throwing an error. The default is ‘300s’.
consumerRefreshInterval STRING The interval at which the consumer is checked and refreshed. The default is ‘300s’.

The following arguments are used for controlling the read throughput and latency for Kinesis:

Parameter Type Description
maxRecordsPerFetch INTEGER (>0) Optional, with a default of 10,000 records to be read per API request to Kinesis.
maxFetchRate STRING How fast to prefetch data per shard. A value between ‘1.0’ and ‘2.0’ that’s measured in MB/s. The default is ‘1.0’.
minFetchPeriod STRING The maximum wait time between consecutive prefetch attempts. The default is ‘400ms’.
maxFetchDuration STRING The maximum duration to buffer prefetched new data. The default is ’10s’.
fetchBufferSize STRING The amount of data for the next trigger. The default is ‘20gb’.
shardsPerTask INTEGER (>0) The number of Kinesis shards to prefetch from in parallel per spark task. The default is 5.
shardFetchinterval STRING How often to poll for resharding. The default is ‘1s’.
coalesceThresholdBlockSize INTEGER (>0) The threshold at which automatic coalesce occurs. The default is 10,000,000.
coalesce BOOLEAN true to coalesce prefetched requests. The default is true.
coalesceBinSize INTEGER (>0) The approximate block size after coalescing. The default is 128,000,000.
reuseKinesisClient BOOLEAN true to reuse the Kinesis client stored in the cache. The default is true except on a PE cluster.
clientRetries INTEGER (>0) The number of retries in the retry scenario. The default is 5.

Returns

A table of Kinesis records with the following schema:

Name Data type Nullable Standard Description
partitionKey STRING No A key that is used to distribute data among the shards of a stream. All data records with the same partition key will be read from the same shard.
data BINARY No The kinesis data payload, base-64 encoded.
stream STRING No The name of the stream where the data was read from.
shardId STRING No A unique identifier for the shard where the data was read from.
sequenceNumber BIGINT No The unique identifier of the record within its shard.
approximateArrivalTimestamp TIMESTAMP No The approximate time that the record was inserted into the stream.

The columns (stream, shardId, sequenceNumber) constitute a primary key.

Examples

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
        awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');