read_kafka
table-valued 函式
適用於: Databricks SQL Databricks Runtime 13.3 LTS 和更新版本
從 Apache Kafka 叢集讀取數據,並以表格式傳回數據。
可以從一或多個 Kafka 主題讀取數據。 它同時支援批次查詢和串流擷取。
語法
read_kafka([option_key => option_value ] [, ...])
引數
此函式 需要具名參數調用。
-
option_key
:要設定的選項名稱。 您必須針對包含點 () 的選項使用反引號 (.
')。 -
option_value
:用來 set 選項的常數表達式。 接受常值和純量函式。
傳回
使用下列 schema從 Apache Kafka 叢集讀取的記錄:
-
key BINARY
:Kafka 記錄的索引鍵。 -
value BINARY NOT NULL
:Kafka 記錄的值。 -
topic STRING NOT NULL
:讀取記錄的 Kafka 主題名稱。 -
partition INT NOT NULL
:讀取記錄 partition Kafka 的標識碼。 -
offset BIGINT NOT NULL
:KafkaTopicPartition
中記錄的 offset 數目。 -
timestamp TIMESTAMP NOT NULL
:記錄的時間戳值。timestampType
column 會定義此時間戳對應的內容。 -
timestampType INTEGER NOT NULL
:timestamp
column中指定的時間戳類型。 -
headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
:標頭 values 作為記錄的一部分提供(如果已啟用)。
範例
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
選項。
您可以在 Apache Spark 檔案中找到詳細的選項列表 list。
必要選項
提供下列選項以連線到 Kafka 叢集。
選項 |
---|
bootstrapServers 類型: String (英文)指向 Kafka 叢集之主機/埠組的逗號分隔 list。 預設值:無 |
只提供下列其中一個選項,以設定要從中提取數據的 Kafka 主題。
選項 |
---|
assign 類型: String (英文)JSON 字串,其中包含要取用的特定主題分割區。 例如,針對 '{"topicA":[0,1],"topicB":[2,4]}' ,topicA 的 0 和第 1 個分割區將會取用自 。預設值:無 |
subscribe 類型: String (英文)要讀取之 Kafka 主題的逗號分隔 list。 預設值:無 |
subscribePattern 類型: String (英文)正則表達式,符合要訂閱的主題。 預設值:無 |
其他選項
read_kafka
可用於批次查詢和串流查詢。 下列選項會指定其套用的查詢類型。
選項 |
---|
endingOffsets 類型:查詢類型: String 僅限批次要讀取直到批次查詢的偏移量,可以使用 "latest" 來指定最新的記錄,或是使用 JSON 字串來為每個 TopicPartition 指定一個結束的 offset。 在 JSON 中,-1 作為 offset 可用來參考最新。 不允許 -2 (最早)用作 offset。預設值: "latest" |
endingOffsetsByTimestamp 類型:查詢類型: String 僅限批次JSON 字串,指定要讀取的結束時間戳,直到每個 TopicPartition 為止。 時間戳必須以毫秒 1970-01-01 00:00:00 UTC 為單位提供時間戳的長值,例如 ,1686444353000 . 如需時間戳的行為詳細數據,請參閱 下方 的附註。endingOffsetsByTimestamp 優先於 endingOffsets 。預設值:無 |
endingTimestamp 類型:查詢類型: String 僅限批次時間戳的字串值,以毫秒為單位,因為 1970-01-01 00:00:00 UTC ,例如 "1686444353000" 。 如果 Kafka 未傳回相符的 offset,offset 將會 set 到最新。 如需時間戳的行為詳細數據,請參閱 下方 的附註。 注意: endingTimestamp 優先於 endingOffsetsByTimestamp 和endingOffsets .預設值:無 |
includeHeaders 類型:查詢類型: Boolean 串流和批次是否要在數據列中包含 Kafka 標頭。 預設值: false |
kafka.<consumer_option> 類型:查詢類型: String 串流和批次任何 Kafka 取用者特定選項都可以使用 kafka. 前置詞傳入。 提供這些選項時,必須以反引號括住,否則將出現 get 解析錯誤。 您可以在 Kafka 檔中找到選項。注意:您不應該使用此函式 set 下列選項: key.deserializer 、 、 value.deserializer 、 bootstrap.servers group.id 預設值:無 |
maxOffsetsPerTrigger 類型:查詢類型: Long 僅串流每個觸發程式間隔所處理之位移或數據列數目上限的速率 limit。 指定的位移總數將會按比例分割到 TopicPartitions。 預設值:無 |
startingOffsets 類型:查詢類型: String 串流和批次查詢啟動時的起點,可能是從最早位移 "earliest" 、從最新位移開始的 "latest" ,或是使用 JSON 字串為每個 TopicPartition 指定起始 offset。 在 JSON 中,-2 作為 offset 可用來指向最早,而 -1 則到最新。注意:對於批次查詢,不允許使用 -1 進行批次查詢,否則不允許使用 -1。 對於串流查詢,這隻適用於啟動新的查詢時。 重新啟動的串流查詢將會從查詢檢查點中定義的位移繼續進行。 查詢期間新探索的分割將最早啟動。 默認值: "latest" 用於串流處理, "earliest" 用於批次 |
startingOffsetsByTimestamp 類型:查詢類型: String 串流和批次JSON 字串,指定每個 TopicPartition 的起始時間戳。 時間戳必須以毫秒 1970-01-01 00:00:00 UTC 為單位提供時間戳的長值,例如 1686444353000 。 如需時間戳的行為詳細數據,請參閱 下方 的附註。 如果 Kafka 未傳回相符的 offset,則行為會依據選項 startingOffsetsByTimestampStrategy 的值來決定。startingOffsetsByTimestamp 優先於 startingOffsets 。注意:對於串流查詢,這隻適用於啟動新的查詢時。 重新啟動的串流查詢將會從查詢檢查點中定義的位移繼續進行。 查詢期間新探索的分割將最早啟動。 預設值:無 |
startingOffsetsByTimestampStrategy 類型:查詢類型: String 串流和批次當指定的起始 offset 時間戳(無論是全域還是每個 partition)與 Kafka 傳回的 offset 不一致時,就會使用此策略。 可用的策略如下: - "error" :查詢失敗- "latest" :為這些分割區指派最新的 offset,讓Spark可以在稍後的微批次中從這些分割區讀取較新的記錄。預設值: "error" |
startingTimestamp 類型:查詢類型: String 串流和批次時間戳的字串值,以毫秒為單位,因為 1970-01-01 00:00:00 UTC ,例如 "1686444353000" 。 如需時間戳的行為詳細數據,請參閱 下方 的附註。 如果 Kafka 未傳回相符的 offset,則行為會依照選項 startingOffsetsByTimestampStrategy 的值來進行。startingTimestamp 優先於 startingOffsetsByTimestamp 與 startingOffsets 。注意:對於串流查詢,這隻適用於啟動新的查詢時。 重新啟動的串流查詢將會從查詢檢查點中定義的位移繼續進行。 查詢期間新探索到的數據分割最早會啟動。 預設值:無 |
注意
針對每個 partition,傳回的 offset 為最早的 offset,其時間戳大於或等於對應 partition中給定的時間戳。 如果 Kafka 未傳回相符 offset,則行為會因選項而異- 請檢查每個選項的描述。
Spark 只會將時間戳信息傳遞至 KafkaConsumer.offsetsForTimes
,而且不會解譯或解釋值的原因。 如需 的詳細資訊 KafkaConsumer.offsetsForTimes
,請參閱 檔。 此外,這裡的時間戳意義可能會根據 Kafka 設定 (log.message.timestamp.type
) 而有所不同。 如需詳細資訊,請參閱 Apache Kafka 檔。