共用方式為


read_kinesis 串流的 table值函數

適用於:核取記號為「是」 Databricks SQL 核取記號為「是」 Databricks Runtime 13.3 LTS 和更新版本

從一個或多個流讀取的 Kinesis 記錄返回為 table。

語法

read_kinesis ( { parameter => value } [, ...] )

引數

read_kinesis 需要具名參數調用

唯一必要的自變數是 streamName。 所有其他自變數都是選擇性的。

這裡簡短說明自變數。 如需詳細資訊,請參閱 Amazon Kinesis 檔。

有各種連線選項可連線及向 AWS 進行驗證。 awsAccessKeyawsSecretKey 可以使用 秘密函式在函式自變數中指定、手動 set 自變數或設定為環境變數,如下所示。 roleArnroleExternalIDroleSessionName 也可以用來使用實例配置檔向 AWS 進行驗證。 如果未指定這些專案,則會使用預設的 AWS 提供者鏈結。

參數 類型 描述
streamName STRING 一或多個 kinesis 數據流的必要逗號分隔 list。
awsAccessKey STRING 如果有的話,AWS 存取密鑰。 也可以透過 AWS 預設認證提供者鏈結所支援的各種選項來指定,包括環境變數 (AWS_ACCESS_KEY_ID) 和認證設定檔檔案。
awsSecretKey STRING 對應至存取金鑰的秘密金鑰。 可以在自變數中指定,或透過 AWS 預設認證提供者鏈結支援的各種選項來指定,包括環境變數(AWS_SECRET_KEYAWS_SECRET_ACCESS_KEY),以及 credentials 配置檔檔案。
roleArn STRING 存取 Kinesis 時要擔任之角色的 Amazon 資源名稱。
roleExternalId STRING 在委派 AWS 帳戶的存取權時使用。
roleSessionName STRING AWS 角色會話名稱。
stsEndpoint STRING 要求暫時存取的端點 credentials。
region STRING 要指定之數據流的區域。 預設值為本機解析的區域。
endpoint STRING Kinesis 數據流的區域端點。 預設值為本機解析的區域。
initialPosition STRING 從數據流中讀取的起始位置。 其中一個:『latest』(預設值)、『trim_horizon』、『最早』、『at_timestamp』。
consumerMode STRING 其中一個: 'polling' (預設值), 或 'EFO' (enhanced-fan-out) 。
consumerName STRING 取用者的名稱。 所有取用者都會加上 『databricks_』。 預設值是空字串。
registerConsumerTimeoutInterval STRING 等候 Kinesis EFO 取用者在擲回錯誤之前向 Kinesis 數據流註冊的最大逾時。 預設值為 『300s』。
requireConsumerDeregistration BOOLEAN true 在查詢終止時取消註冊 EFO 取用者。 預設值為 false
deregisterConsumerTimeoutInterval STRING 等候 Kinesis EFO 取用者在擲回錯誤之前,先向 Kinesis 數據流取消註冊的最大逾時。 預設值為 『300s』。
consumerRefreshInterval STRING 檢查和重新整理取用者的間隔。 預設值為 『300s』。

下列自變數用於控制 Kinesis 的讀取輸送量和延遲:

參數 類型 描述
maxRecordsPerFetch INTEGER (>0) 選擇性,預設為10,000筆記錄,每個API要求都會讀取 Kinesis。
maxFetchRate STRING 每個分區擷取數據的速度有多快。 介於 '1.0' 和 '2.0' 之間的值,以 MB/秒為單位。 預設值為 『1.0』。
minFetchPeriod STRING 連續預先擷取嘗試之間的等候時間上限。 默認值為 『400ms』。
maxFetchDuration STRING 緩衝預先擷取新數據的最大持續時間。 預設值為 『10s』。
fetchBufferSize STRING 下一個觸發程序的數據量。 默認值為 『20gb』。
shardsPerTask INTEGER (>0) 每個 Spark 工作平行擷取的 Kinesis 分區數目。 預設值為 5。
shardFetchinterval STRING 輪詢重新分區的頻率。 預設值為 『1s』。
coalesceThresholdBlockSize INTEGER (>0) 自動聯合的臨界值。 默認值為 10,000,000。
coalesce BOOLEAN true 以聯合預先擷取的要求。 預設值為 true
coalesceBinSize INTEGER (>0) 聯合后的近似區塊大小。 默認值為 128,000,000。
reuseKinesisClient BOOLEAN true 表示重複使用儲存在快取中的 Kinesis 用戶端。 預設值為 true PE叢集上的例外。
clientRetries INTEGER (>0) 重試案例中的重試次數。 預設值為 5。

傳回

具有下列 schema的 Kinesis 記錄 table:

名稱 資料類型 Nullable 標準 描述
partitionKey STRING No 索引鍵,用來在數據流的分區之間散發數據。 具有相同 partition 索引鍵的所有數據記錄都會從相同的分區讀取。
data BINARY No 基底-64 編碼的 Kinesis 數據承載。
stream STRING No 讀取數據的數據流名稱是 where。
shardId STRING No 一個與讀取數據自 where 分區相關的唯一 identifier。
sequenceNumber BIGINT No 記錄在其分片內的唯一 identifier。
approximateArrivalTimestamp TIMESTAMP No 記錄插入數據流的大約時間。

columns (stream, shardId, sequenceNumber) 構成主鍵。

範例

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
        awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');