read_kafka
テーブル値関数
適用対象: Databricks SQL Databricks Runtime 13.3 LTS 以上
Apache Kafka クラスターからデータを読み取り、表形式でデータを返します。
1 つ以上の Kafka トピックからデータを読み取ることができます。 バッチ クエリとストリーミング インジェストの両方がサポートされています。
構文
read_kafka([option_key => option_value ] [, ...])
引数
この関数には、名前付きパラメーター呼び出しが必要です。
option_key
: 構成するオプションの名前。 ドット(.
) を含むオプションには、バッククウォート (`) を使用する必要があります。option_value
: オプションを設定する定数式。 リテラルとスカラー関数を受け入れます。
返品
次のスキーマを使用して Apache Kafka クラスターから読み取られたレコード。
key BINARY
: Kafka レコードのキー。value BINARY NOT NULL
: Kafka レコードの値。topic STRING NOT NULL
: レコードが読み取られた Kafka トピックの名前。partition INT NOT NULL
: レコードが読み取られた Kafka パーティションの ID。offset BIGINT NOT NULL
: KafkaTopicPartition
内のレコードのオフセット番号。timestamp TIMESTAMP NOT NULL
: レコードのタイムスタンプ値。timestampType
列は、このタイムスタンプが対応する内容を定義します。timestampType INTEGER NOT NULL
:timestamp
列に指定されたタイムスタンプの型。headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: レコードの一部として提供されるヘッダー値 (有効な場合)。
例
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
[オプション]
オプションの詳細な一覧については、Apache Spark のドキュメントを参照してください。
必須のオプション
Kafka クラスターに接続するためのオプションを次に示します。
オプション |
---|
bootstrapServers 型: String Kafka クラスターを指すホストとポートのペアのコンマ区切りのリスト。 既定値: なし |
データをプルする Kafka トピックを構成するには、次のいずれかのオプションのみを指定します。
オプション |
---|
assign 型: String 使用する特定のトピック パーティションを含む JSON 文字列。 たとえば、 '{"topicA":[0,1],"topicB":[2,4]}' の場合、topicA の 0 番目と 1 番目のパーティションは次で使用されます。既定値: なし |
subscribe 型: String 読み取る Kafka トピックのコンマ区切りのリスト。 既定値: なし |
subscribePattern 型: String サブスクライブする正規表現の一致するトピック。 既定値: なし |
その他のオプション
read_kafka
は、バッチ クエリとストリーミング クエリで使用できます。 以下のオプションでは、適用するクエリの種類を指定します。
オプション |
---|
endingOffsets 種類: String クエリの種類: バッチのみバッチ クエリまで読み取るオフセット。最新のレコードを指定する場合は "latest" 、または各 TopicPartition の終了オフセットを指定する場合は JSON 文字列。 JSON では、オフセットとして -1 を使用して latest を示すことができます。 オフセットとしての -2 (最も早い) は許可されません。既定値: "latest" |
endingOffsetsByTimestamp 種類: String クエリの種類: バッチのみ各 TopicPartition まで読み取る終了タイムスタンプを指定する JSON 文字列。 タイムスタンプは、 1970-01-01 00:00:00 UTC 以降のミリ秒単位の長い値で指定する必要があります (例1686444353000 = タイムスタンプを使用した動作の詳細については、以下の注意事項を参照してください。endingOffsetsByTimestamp は、endingOffsets よりも優先されます。既定値: なし |
endingTimestamp 種類: String クエリの種類: バッチのみ以下の時刻以降のタイムスタンプの文字列値 (ミリ秒単位): 1970-01-01 00:00:00 UTC (例: "1686444353000" )。 Kafka が一致したオフセットを返さない場合、オフセットは最新の状態に設定されます。 タイムスタンプを使用した動作の詳細については、以下の注意事項を参照してください。 注: endingTimestamp は endingOffsetsByTimestamp よりも優先されますendingOffsets =既定値: なし |
includeHeaders 種類: Boolean クエリの種類: ストリーミングとバッチ行に Kafka ヘッダーを含めるかどうか。 既定値: false |
kafka.<consumer_option> 種類: String クエリの種類: ストリーミングとバッチKafka コンシューマー固有のオプションは、 kafka. プレフィックスと共に渡すことができます。 指定した場合、これらのオプションはバッククォータで囲む必要があります。そうしないと、パーサー エラーが発生します。 Kafka のオプションについては、ドキュメントを参照してください。注: この関数では、次のオプションを設定しないでください。 既定値: なし |
maxOffsetsPerTrigger 種類: Long クエリの種類: ストリーミングのみトリガー間隔ごとに処理される行のオフセットの最大数に対するレート制限。 指定されたオフセットの総数は、TopicPartitions 間で比例的に分割されます。 既定値: なし |
startingOffsets 種類: String クエリの種類: ストリーミングとバッチクエリが開始されたときの開始点。最も古いオフセットからの "earliest" 、最新のオフセットからの "latest" 、または各 TopicPartition の開始オフセットを指定する JSON 文字列。 JSON では、オフセットとして -2 を使用して earliest を、-1 で latest を示すことができます。注: バッチ クエリの場合、latest (暗黙的に、または JSON で -1 を使用して) は許可されません。 ストリーミング クエリの場合、これは新しいクエリが開始された場合にのみ適用されます。 再起動されたストリーミング クエリは、クエリ チェックポイントで定義されているオフセットから続行されます。 クエリ中に新しく検出されたパーティションは、earliest で開始されます。 既定値: ストリーミング用は "latest" 、バッチ用は "earliest" |
startingOffsetsByTimestamp 種類: String クエリの種類: ストリーミングとバッチ各 TopicPartition の開始タイムスタンプを指定する JSON 文字列。 タイムスタンプは、 1970-01-01 00:00:00 UTC 以降のミリ秒単位の長い値で指定する必要があります (例: 1686444353000 )。 タイムスタンプを使用した動作の詳細については、以下の注意事項を参照してください。 Kafka が一致したオフセットを返さない場合、動作はオプション startingOffsetsByTimestampStrategy の値に従います。startingOffsetsByTimestamp は、startingOffsets よりも優先されます。注: ストリーミング クエリの場合、これは新しいクエリが開始された場合にのみ適用されます。 再起動されたストリーミング クエリは、クエリ チェックポイントで定義されているオフセットから続行されます。 クエリ中に新しく検出されたパーティションは、earliest で開始されます。 既定値: なし |
startingOffsetsByTimestampStrategy 種類: String クエリの種類: ストリーミングとバッチこの方法は、タイムスタンプ (グローバルまたはパーティションごと) で開始オフセットが指定された場合に、Kafka が返すオフセットと一致しない場合に使用されます。 使用可能な方法は次のとおりです。 - "error" : クエリが失敗します- "latest" : Spark が後のマイクロバッチでこれらのパーティションから新しいレコードを読み取ることができるように、これらのパーティションの最新のオフセットを割り当てます。既定値: "error" |
startingTimestamp 種類: String クエリの種類: ストリーミングとバッチ以下の時刻以降のタイムスタンプの文字列値 (ミリ秒単位): 1970-01-01 00:00:00 UTC (例: "1686444353000" )。 タイムスタンプを使用した動作の詳細については、以下の注意事項を参照してください。 Kafka が一致したオフセットを返さない場合、動作はオプション startingOffsetsByTimestampStrategy の値に従います。startingTimestamp は、startingOffsetsByTimestamp および startingOffsets よりも優先されます。注: ストリーミング クエリの場合、これは新しいクエリが開始された場合にのみ適用されます。 再起動されたストリーミング クエリは、クエリ チェックポイントで定義されているオフセットから続行されます。 クエリ中に新しく検出されたパーティションが最も早く開始されます。 既定値: なし |
Note
各パーティションに返されるオフセットは、タイムスタンプが対応するパーティション内の指定されたタイムスタンプと同等かそれ以上の最も古いオフセットです。 Kafka が一致したオフセットを返さない場合、各オプションの説明を確認する場合、動作はオプションによって異なります。
Spark は、タイムスタンプ情報を KafkaConsumer.offsetsForTimes
に渡すだけで、値の解釈や理由付けは行われません。 KafkaConsumer.offsetsForTimes
の詳細については、ドキュメントを参照してください。 また、ここでのタイムスタンプの意味は、Kafka 構成 (log.message.timestamp.type
) によって異なる場合があります。 詳細については、Apache Kafka のドキュメントをご覧ください。