read_kinesis
串流的 table值函數
適用於: Databricks SQL Databricks Runtime 13.3 LTS 和更新版本
從一個或多個流讀取的 Kinesis 記錄返回為 table。
語法
read_kinesis ( { parameter => value } [, ...] )
引數
read_kinesis
需要具名參數調用。
唯一必要的自變數是 streamName
。 所有其他自變數都是選擇性的。
這裡簡短說明自變數。 如需詳細資訊,請參閱 Amazon Kinesis 檔。
有各種連線選項可連線及向 AWS 進行驗證。
awsAccessKey
和 awsSecretKey
可以使用 秘密函式在函式自變數中指定、手動 set 自變數或設定為環境變數,如下所示。
roleArn
、 roleExternalID
, roleSessionName
也可以用來使用實例配置檔向 AWS 進行驗證。
如果未指定這些專案,則會使用預設的 AWS 提供者鏈結。
參數 | 類型 | 描述 |
---|---|---|
streamName |
STRING |
一或多個 kinesis 數據流的必要逗號分隔 list。 |
awsAccessKey |
STRING |
如果有的話,AWS 存取密鑰。 也可以透過 AWS 預設認證提供者鏈結所支援的各種選項來指定,包括環境變數 (AWS_ACCESS_KEY_ID ) 和認證設定檔檔案。 |
awsSecretKey |
STRING |
對應至存取金鑰的秘密金鑰。 可以在自變數中指定,或透過 AWS 預設認證提供者鏈結支援的各種選項來指定,包括環境變數(AWS_SECRET_KEY 或 AWS_SECRET_ACCESS_KEY ),以及 credentials 配置檔檔案。 |
roleArn |
STRING |
存取 Kinesis 時要擔任之角色的 Amazon 資源名稱。 |
roleExternalId |
STRING |
在委派 AWS 帳戶的存取權時使用。 |
roleSessionName |
STRING |
AWS 角色會話名稱。 |
stsEndpoint |
STRING |
要求暫時存取的端點 credentials。 |
region |
STRING |
要指定之數據流的區域。 預設值為本機解析的區域。 |
endpoint |
STRING |
Kinesis 數據流的區域端點。 預設值為本機解析的區域。 |
initialPosition |
STRING |
從數據流中讀取的起始位置。 其中一個:『latest』(預設值)、『trim_horizon』、『最早』、『at_timestamp』。 |
consumerMode |
STRING |
其中一個: 'polling' (預設值), 或 'EFO' (enhanced-fan-out) 。 |
consumerName |
STRING |
取用者的名稱。 所有取用者都會加上 『databricks_』。 預設值是空字串。 |
registerConsumerTimeoutInterval |
STRING |
等候 Kinesis EFO 取用者在擲回錯誤之前向 Kinesis 數據流註冊的最大逾時。 預設值為 『300s』。 |
requireConsumerDeregistration |
BOOLEAN |
true 在查詢終止時取消註冊 EFO 取用者。 預設值為 false 。 |
deregisterConsumerTimeoutInterval |
STRING |
等候 Kinesis EFO 取用者在擲回錯誤之前,先向 Kinesis 數據流取消註冊的最大逾時。 預設值為 『300s』。 |
consumerRefreshInterval |
STRING |
檢查和重新整理取用者的間隔。 預設值為 『300s』。 |
下列自變數用於控制 Kinesis 的讀取輸送量和延遲:
參數 | 類型 | 描述 |
---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
選擇性,預設為10,000筆記錄,每個API要求都會讀取 Kinesis。 |
maxFetchRate |
STRING |
每個分區擷取數據的速度有多快。 介於 '1.0' 和 '2.0' 之間的值,以 MB/秒為單位。 預設值為 『1.0』。 |
minFetchPeriod |
STRING |
連續預先擷取嘗試之間的等候時間上限。 默認值為 『400ms』。 |
maxFetchDuration |
STRING |
緩衝預先擷取新數據的最大持續時間。 預設值為 『10s』。 |
fetchBufferSize |
STRING |
下一個觸發程序的數據量。 默認值為 『20gb』。 |
shardsPerTask |
INTEGER (>0) |
每個 Spark 工作平行擷取的 Kinesis 分區數目。 預設值為 5。 |
shardFetchinterval |
STRING |
輪詢重新分區的頻率。 預設值為 『1s』。 |
coalesceThresholdBlockSize |
INTEGER (>0) |
自動聯合的臨界值。 默認值為 10,000,000。 |
coalesce |
BOOLEAN |
true 以聯合預先擷取的要求。 預設值為 true 。 |
coalesceBinSize |
INTEGER (>0) |
聯合后的近似區塊大小。 默認值為 128,000,000。 |
reuseKinesisClient |
BOOLEAN |
true 表示重複使用儲存在快取中的 Kinesis 用戶端。 預設值為 true PE叢集上的例外。 |
clientRetries |
INTEGER (>0) |
重試案例中的重試次數。 預設值為 5。 |
傳回
具有下列 schema的 Kinesis 記錄 table:
名稱 | 資料類型 | Nullable | 標準 | 描述 |
---|---|---|---|---|
partitionKey |
STRING |
No | 索引鍵,用來在數據流的分區之間散發數據。 具有相同 partition 索引鍵的所有數據記錄都會從相同的分區讀取。 | |
data |
BINARY |
No | 基底-64 編碼的 Kinesis 數據承載。 | |
stream |
STRING |
No | 讀取數據的數據流名稱是 where。 | |
shardId |
STRING |
No | 一個與讀取數據自 where 分區相關的唯一 identifier。 | |
sequenceNumber |
BIGINT |
No | 記錄在其分片內的唯一 identifier。 | |
approximateArrivalTimestamp |
TIMESTAMP |
No | 記錄插入數據流的大約時間。 |
columns
(stream, shardId, sequenceNumber)
構成主鍵。
範例
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');