共用方式為


read_kafka table-valued 函式

適用於:核取記號為「是」 Databricks SQL 核取記號為「是」 Databricks Runtime 13.3 LTS 和更新版本

從 Apache Kafka 叢集讀取數據,並以表格式傳回數據。

可以從一或多個 Kafka 主題讀取數據。 它同時支援批次查詢和串流擷取。

語法

read_kafka([option_key => option_value ] [, ...])

引數

此函式 需要具名參數調用

  • option_key:要設定的選項名稱。 您必須針對包含點 () 的選項使用反引號 (.')。
  • option_value:用來 set 選項的常數表達式。 接受常值和純量函式。

傳回

使用下列 schema從 Apache Kafka 叢集讀取的記錄:

  • key BINARY:Kafka 記錄的索引鍵。
  • value BINARY NOT NULL:Kafka 記錄的值。
  • topic STRING NOT NULL:讀取記錄的 Kafka 主題名稱。
  • partition INT NOT NULL:讀取記錄 partition Kafka 的標識碼。
  • offset BIGINT NOT NULL:Kafka TopicPartition中記錄的 offset 數目。
  • timestamp TIMESTAMP NOT NULL:記錄的時間戳值。 timestampType column 會定義此時間戳對應的內容。
  • timestampType INTEGER NOT NULLtimestampcolumn中指定的時間戳類型。
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>:標頭 values 作為記錄的一部分提供(如果已啟用)。

範例

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

選項。

您可以在 Apache Spark 檔案中找到詳細的選項列表 list。

必要選項

提供下列選項以連線到 Kafka 叢集。

選項
bootstrapServers

類型:String (英文)

指向 Kafka 叢集之主機/埠組的逗號分隔 list。

預設值:無

只提供下列其中一個選項,以設定要從中提取數據的 Kafka 主題。

選項
assign

類型:String (英文)

JSON 字串,其中包含要取用的特定主題分割區。 例如,針對 '{"topicA":[0,1],"topicB":[2,4]}',topicA 的 0 和第 1 個分割區將會取用自 。

預設值:無
subscribe

類型:String (英文)

要讀取之 Kafka 主題的逗號分隔 list。

預設值:無
subscribePattern

類型:String (英文)

正則表達式,符合要訂閱的主題。

預設值:無

其他選項

read_kafka 可用於批次查詢和串流查詢。 下列選項會指定其套用的查詢類型。

選項
endingOffsets

類型:查詢類型: String 僅限批次

要讀取直到批次查詢的偏移量,可以使用 "latest" 來指定最新的記錄,或是使用 JSON 字串來為每個 TopicPartition 指定一個結束的 offset。 在 JSON 中,-1 作為 offset 可用來參考最新。 不允許 -2(最早)用作 offset。

預設值:"latest"
endingOffsetsByTimestamp

類型:查詢類型: String 僅限批次

JSON 字串,指定要讀取的結束時間戳,直到每個 TopicPartition 為止。 時間戳必須以毫秒 1970-01-01 00:00:00 UTC為單位提供時間戳的長值,例如 ,
1686444353000. 如需時間戳的行為詳細數據,請參閱 下方 的附註。
endingOffsetsByTimestamp 優先於 endingOffsets

預設值:無
endingTimestamp

類型:查詢類型: String 僅限批次

時間戳的字串值,以毫秒為單位,因為
1970-01-01 00:00:00 UTC,例如 "1686444353000"。 如果 Kafka 未傳回相符的 offset,offset 將會 set 到最新。 如需時間戳的行為詳細數據,請參閱 下方 的附註。 注意: endingTimestamp 優先於 endingOffsetsByTimestamp
endingOffsets.

預設值:無
includeHeaders

類型:查詢類型: Boolean 串流和批次

是否要在數據列中包含 Kafka 標頭。

預設值:false
kafka.<consumer_option>

類型:查詢類型: String 串流和批次

任何 Kafka 取用者特定選項都可以使用 kafka. 前置詞傳入。 提供這些選項時,必須以反引號括住,否則將出現 get 解析錯誤。 您可以在 Kafka 檔中找到選項。

注意:您不應該使用此函式 set 下列選項:
key.deserializer、 、 value.deserializerbootstrap.serversgroup.id

預設值:無
maxOffsetsPerTrigger

類型:查詢類型: Long 僅串流

每個觸發程式間隔所處理之位移或數據列數目上限的速率 limit。 指定的位移總數將會按比例分割到 TopicPartitions。

預設值:無
startingOffsets

類型:查詢類型: String 串流和批次

查詢啟動時的起點,可能是從最早位移 "earliest"、從最新位移開始的 "latest",或是使用 JSON 字串為每個 TopicPartition 指定起始 offset。 在 JSON 中,-2 作為 offset 可用來指向最早,而 -1 則到最新。

注意:對於批次查詢,不允許使用 -1 進行批次查詢,否則不允許使用 -1。 對於串流查詢,這隻適用於啟動新的查詢時。 重新啟動的串流查詢將會從查詢檢查點中定義的位移繼續進行。 查詢期間新探索的分割將最早啟動。

默認值: "latest" 用於串流處理, "earliest" 用於批次
startingOffsetsByTimestamp

類型:查詢類型: String 串流和批次

JSON 字串,指定每個 TopicPartition 的起始時間戳。 時間戳必須以毫秒 1970-01-01 00:00:00 UTC為單位提供時間戳的長值,例如 1686444353000。 如需時間戳的行為詳細數據,請參閱 下方 的附註。 如果 Kafka 未傳回相符的 offset,則行為會依據選項 startingOffsetsByTimestampStrategy的值來決定。
startingOffsetsByTimestamp 優先於 startingOffsets

注意:對於串流查詢,這隻適用於啟動新的查詢時。 重新啟動的串流查詢將會從查詢檢查點中定義的位移繼續進行。 查詢期間新探索的分割將最早啟動。

預設值:無
startingOffsetsByTimestampStrategy

類型:查詢類型: String 串流和批次

當指定的起始 offset 時間戳(無論是全域還是每個 partition)與 Kafka 傳回的 offset 不一致時,就會使用此策略。 可用的策略如下:

- "error":查詢失敗
- "latest":為這些分割區指派最新的 offset,讓Spark可以在稍後的微批次中從這些分割區讀取較新的記錄。

預設值:"error"
startingTimestamp

類型:查詢類型: String 串流和批次

時間戳的字串值,以毫秒為單位,因為
1970-01-01 00:00:00 UTC,例如 "1686444353000"。 如需時間戳的行為詳細數據,請參閱 下方 的附註。 如果 Kafka 未傳回相符的 offset,則行為會依照選項 startingOffsetsByTimestampStrategy的值來進行。
startingTimestamp 優先於 startingOffsetsByTimestampstartingOffsets

注意:對於串流查詢,這隻適用於啟動新的查詢時。 重新啟動的串流查詢將會從查詢檢查點中定義的位移繼續進行。 查詢期間新探索到的數據分割最早會啟動。

預設值:無

注意

針對每個 partition,傳回的 offset 為最早的 offset,其時間戳大於或等於對應 partition中給定的時間戳。 如果 Kafka 未傳回相符 offset,則行為會因選項而異- 請檢查每個選項的描述。

Spark 只會將時間戳信息傳遞至 KafkaConsumer.offsetsForTimes,而且不會解譯或解釋值的原因。 如需 的詳細資訊 KafkaConsumer.offsetsForTimes,請參閱 。 此外,這裡的時間戳意義可能會根據 Kafka 設定 (log.message.timestamp.type) 而有所不同。 如需詳細資訊,請參閱 Apache Kafka 檔