read_kafka
table-valued 函式
適用於: Databricks SQL Databricks Runtime 13.3 LTS 和更新版本
從 Apache Kafka 叢集讀取數據,並以表格式傳回數據。
可以從一或多個 Kafka 主題讀取數據。 它同時支援批次查詢和串流擷取。
語法
read_kafka([option_key => option_value ] [, ...])
引數
此函式 需要具名參數調用。
option_key
:要設定的選項名稱。 您必須針對包含點 () 的選項使用反引號 (.
')。option_value
:要設定選項的常數表達式。 接受常值和純量函式。
傳回
使用下列架構從 Apache Kafka 叢集讀取的記錄:
key BINARY
:Kafka 記錄的索引鍵。value BINARY NOT NULL
:Kafka 記錄的值。topic STRING NOT NULL
:讀取記錄的 Kafka 主題名稱。partition INT NOT NULL
:讀取記錄的 Kafka 分割區識別碼。offset BIGINT NOT NULL
:KafkaTopicPartition
中記錄的位移編號。timestamp TIMESTAMP NOT NULL
:記錄的時間戳值。 數據timestampType
行會定義此時間戳對應至的專案。timestampType INTEGER NOT NULL
:資料列中指定的timestamp
時間戳類型。headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
:提供做為記錄一部分的標頭值(如果已啟用)。
範例
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
選項。
您可以在 Apache Spark 檔案中找到詳細的選項清單。
必要選項
提供下列選項以連線到 Kafka 叢集。
選項 |
---|
bootstrapServers 類型: String (英文)指向 Kafka 叢集的主機/埠組逗號分隔清單。 預設值:無 |
只提供下列其中一個選項,以設定要從中提取數據的 Kafka 主題。
選項 |
---|
assign 類型: String (英文)JSON 字串,其中包含要取用的特定主題分割區。 例如,針對 '{"topicA":[0,1],"topicB":[2,4]}' ,topicA 的 0 和第 1 個分割區將會取用自 。預設值:無 |
subscribe 類型: String (英文)要讀取的 Kafka 主題逗號分隔清單。 預設值:無 |
subscribePattern 類型: String (英文)正則表達式,符合要訂閱的主題。 預設值:無 |
其他選項
read_kafka
可用於批次查詢和串流查詢。 下列選項會指定其套用的查詢類型。
選項 |
---|
endingOffsets 類型:查詢類型: String 僅限批次要讀取直到批次查詢的位移、 "latest" 指定最新記錄,或是指定每個 TopicPartition 結束位移的 JSON 字串。 在 JSON 中, -1 作為位移可用來參考最新。 -2 (最早)不允許位移。預設值: "latest" |
endingOffsetsByTimestamp 類型:查詢類型: String 僅限批次JSON 字串,指定要讀取的結束時間戳,直到每個 TopicPartition 為止。 時間戳必須以毫秒 1970-01-01 00:00:00 UTC 為單位提供時間戳的長值,例如 ,1686444353000 . 如需時間戳的行為詳細數據,請參閱 下方 的附註。endingOffsetsByTimestamp 優先於 endingOffsets 。預設值:無 |
endingTimestamp 類型:查詢類型: String 僅限批次時間戳的字串值,以毫秒為單位,因為 1970-01-01 00:00:00 UTC ,例如 "1686444353000" 。 如果 Kafka 未傳回相符的位移,則會將位移設定為最新位移。 如需時間戳的行為詳細數據,請參閱 下方 的附註。 注意: endingTimestamp 優先於 endingOffsetsByTimestamp 和endingOffsets .預設值:無 |
includeHeaders 類型:查詢類型: Boolean 串流和批次是否要在數據列中包含 Kafka 標頭。 預設值: false |
kafka.<consumer_option> 類型:查詢類型: String 串流和批次任何 Kafka 取用者特定選項都可以使用 kafka. 前置詞傳入。 提供時,這些選項必須以反引弧括住,否則您將會收到剖析器錯誤。 您可以在 Kafka 檔中找到選項。注意:您不應該使用此函式來設定下列選項: key.deserializer 、 、 value.deserializer 、 bootstrap.servers group.id 預設值:無 |
maxOffsetsPerTrigger 類型:查詢類型: Long 僅串流每個觸發程式間隔所處理之位移或數據列數目上限的速率限制。 指定的位移總數將會按比例分割到 TopicPartitions。 預設值:無 |
startingOffsets 類型:查詢類型: String 串流和批次查詢啟動時的起點,無論是 "earliest" 從最早位移, "latest" 也就是來自最新的位移,或是指定每個 TopicPartition 起始位移的 JSON 字串。 在 JSON 中, -2 作為位移可用來參考最早的 -1 最新的位移。注意:對於批次查詢,不允許使用 -1 進行批次查詢,否則不允許使用 -1。 對於串流查詢,這隻適用於啟動新的查詢時。 重新啟動的串流查詢將會從查詢檢查點中定義的位移繼續進行。 查詢期間新探索的分割將最早啟動。 默認值: "latest" 用於串流處理, "earliest" 用於批次 |
startingOffsetsByTimestamp 類型:查詢類型: String 串流和批次JSON 字串,指定每個 TopicPartition 的起始時間戳。 時間戳必須以毫秒 1970-01-01 00:00:00 UTC 為單位提供時間戳的長值,例如 1686444353000 。 如需時間戳的行為詳細數據,請參閱 下方 的附註。 如果 Kafka 未傳回相符的位移,則行為會遵循選項 startingOffsetsByTimestampStrategy 的值。startingOffsetsByTimestamp 優先於 startingOffsets 。注意:對於串流查詢,這隻適用於啟動新的查詢時。 重新啟動的串流查詢將會從查詢檢查點中定義的位移繼續進行。 查詢期間新探索的分割將最早啟動。 預設值:無 |
startingOffsetsByTimestampStrategy 類型:查詢類型: String 串流和批次當指定的起始時間戳(全域或每個分割區)與傳回的位移 Kafka 不相符時,會使用此策略。 可用的策略如下: - "error" :查詢失敗- "latest" :指派這些分割區的最新位移,讓Spark可以在稍後的微批次中從這些分割區讀取較新的記錄。預設值: "error" |
startingTimestamp 類型:查詢類型: String 串流和批次時間戳的字串值,以毫秒為單位,因為 1970-01-01 00:00:00 UTC ,例如 "1686444353000" 。 如需時間戳的行為詳細數據,請參閱 下方 的附註。 如果 Kafka 未傳回相符的位移,則行為會遵循選項 startingOffsetsByTimestampStrategy 的值。startingTimestamp 優先於 startingOffsetsByTimestamp 與 startingOffsets 。注意:對於串流查詢,這隻適用於啟動新的查詢時。 重新啟動的串流查詢將會從查詢檢查點中定義的位移繼續進行。 查詢期間新探索到的數據分割最早會啟動。 預設值:無 |
注意
每個分割區傳回的位移是最早的位移,其時間戳大於或等於對應數據分割中的指定時間戳。 如果 Kafka 未傳回相符位移,則行為會因選項而異 - 檢查每個選項的描述。
Spark 只會將時間戳信息傳遞至 KafkaConsumer.offsetsForTimes
,而且不會解譯或解釋值的原因。 如需 的詳細資訊 KafkaConsumer.offsetsForTimes
,請參閱 檔。 此外,這裡的時間戳意義可能會根據 Kafka 設定 (log.message.timestamp.type
) 而有所不同。 如需詳細資訊,請參閱 Apache Kafka 檔。