funzione a valore read_kafka
table
Si applica a: Databricks SQL Databricks Runtime 13.3 LTS e versioni successive
Legge i dati da un cluster Apache Kafka e restituisce i dati in formato tabulare.
Può leggere i dati da uno o più argomenti kafka. Supporta sia le query batch che l'inserimento in streaming.
Sintassi
read_kafka([option_key => option_value ] [, ...])
Argomenti
Questa funzione richiede la chiamata di parametri denominati.
-
option_key
: nome dell'opzione da configurare. È necessario utilizzare i backtick (') per le opzioni che contengono punti (.
). -
option_value
: Un'espressione costante per determinare l'opzione set. Accetta valori letterali e funzioni scalari.
Valori restituiti
I record letti da un cluster Apache Kafka con il schemaseguente:
-
key BINARY
: chiave del record Kafka. -
value BINARY NOT NULL
: valore del record Kafka. -
topic STRING NOT NULL
: nome dell'argomento Kafka da cui viene letto il record. -
partition INT NOT NULL
: L'ID di Kafka partition da cui viene letta la registrazione. -
offset BIGINT NOT NULL
: il numero del record offset nel sistemaTopicPartition
Kafka. -
timestamp TIMESTAMP NOT NULL
: valore timestamp per il record. IltimestampType
column definisce a cosa corrisponde questo timestamp. -
timestampType INTEGER NOT NULL
: tipo del timestamp specificato neltimestamp
column. -
headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: intestazione values fornita come parte del record (se abilitata).
Esempi
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Opzioni
È possibile trovare un list dettagliato delle opzioni nella documentazione di Apache Spark .
Opzioni obbligatorie
Fornire l'opzione seguente per la connessione al cluster Kafka.
Opzione |
---|
bootstrapServers Tipo: String Un elenco list delimitato da virgole di coppie di host/porta che puntano al cluster Kafka. Valore predefinito: Nessuno |
Fornire solo una delle opzioni seguenti per configurare gli argomenti Kafka da cui eseguire il pull dei dati.
Opzione |
---|
assign Tipo: String Stringa JSON che contiene le partizioni di argomento specifiche da utilizzare. Ad esempio, per '{"topicA":[0,1],"topicB":[2,4]}' le partizioni 0 e 1° di topicA verranno utilizzate.Valore predefinito: Nessuno |
subscribe Tipo: String Un list delimitato da virgole di argomenti Kafka da cui leggere. Valore predefinito: Nessuno |
subscribePattern Tipo: String Argomenti corrispondenti a un'espressione regolare a cui sottoscrivere. Valore predefinito: Nessuno |
Altre opzioni
read_kafka
può essere usato nelle query batch e nelle query di streaming. Le opzioni seguenti specificano il tipo di query a cui si applicano.
Opzione |
---|
endingOffsets Tipo: Tipo di query: String solo batchGli offset da leggere per una query batch fino a quando non si arriva a "latest" per specificare i record più recenti, o una stringa JSON che specifica un offset di chiusura per ogni TopicPartition. In JSON -1 come offset può essere usato per fare riferimento alla versione più recente.
-2 (più antico) come offset non è consentito.Valore predefinito: "latest" |
endingOffsetsByTimestamp Tipo: Tipo di query: String solo batchStringa JSON che specifica un timestamp finale da leggere fino a quando non per ogni TopicPartition. I timestamp devono essere forniti come valore lungo del timestamp in millisecondi, ad esempio , 1970-01-01 00:00:00 UTC 1686444353000 . Per informazioni dettagliate sul comportamento con timestamp, vedere la nota seguente .endingOffsetsByTimestamp ha la precedenza rispetto a endingOffsets .Valore predefinito: Nessuno |
endingTimestamp Tipo: Tipo di query: String solo batchValore stringa del timestamp espresso in millisecondi da 1970-01-01 00:00:00 UTC , ad esempio "1686444353000" . Se Kafka non restituisce l'offsetcorrispondente, il offset verrà set alla versione più recente. Per informazioni dettagliate sul comportamento con timestamp, vedere la nota seguente . Nota: endingTimestamp ha la precedenza su endingOffsetsByTimestamp eendingOffsets .Valore predefinito: Nessuno |
includeHeaders Tipo: Tipo di query: Boolean streaming e batchIndica se includere le intestazioni Kafka nella riga. Valore predefinito: false |
kafka.<consumer_option> Tipo: Tipo di query: String streaming e batchQualsiasi opzione specifica del consumer Kafka può essere passata con il kafka. prefisso . Queste opzioni devono essere racchiuse da backtick quando specificato; in caso contrario, si get un errore del parser. Le opzioni sono disponibili nella documentazione di Kafka.Nota: non è consigliabile set le opzioni seguenti con questa funzione: key.deserializer , value.deserializer , bootstrap.servers group.id Valore predefinito: Nessuno |
maxOffsetsPerTrigger Tipo: Tipo di query: Long solo streamingValore limit del numero massimo di offset o righe elaborate per l'intervallo di trigger. Il numero totale specificato di offset verrà suddiviso proporzionalmente tra TopicPartitions. Valore predefinito: Nessuno |
startingOffsets Tipo: Tipo di query: String streaming e batchQuando si avvia una query, il punto di partenza è "earliest" , che proviene dagli offset meno recenti, "latest" che proviene solo dagli offset più recenti, o una stringa JSON che specifica un offset iniziale per ogni TopicPartition. Nel codice JSON, -2 come offset può essere usato per fare riferimento alla versione più antica, -1 alla più recente.Nota: per le query batch, la versione più recente (implicitamente o tramite -1 in JSON) non è consentita. Per le query di streaming, questo vale solo quando viene avviata una nuova query. Le query di streaming riavviate continueranno dagli offset definiti nel checkpoint della query. Le nuove partizioni individuate durante una query inizieranno al più presto. Valore predefinito: "latest" per lo streaming, "earliest" per batch |
startingOffsetsByTimestamp Tipo: Tipo di query: String streaming e batchStringa JSON che specifica un timestamp iniziale per ogni TopicPartition. I timestamp devono essere specificati come valore lungo del timestamp in millisecondi, ad 1970-01-01 00:00:00 UTC esempio 1686444353000 . Per informazioni dettagliate sul comportamento con timestamp, vedere la nota seguente . Se Kafka non restituisce il offsetcorrispondente, il comportamento seguirà il valore dell'opzione startingOffsetsByTimestampStrategy .startingOffsetsByTimestamp ha la precedenza rispetto a startingOffsets .Nota: per le query di streaming, questa operazione si applica solo all'avvio di una nuova query. Le query di streaming riavviate continueranno dagli offset definiti nel checkpoint della query. Le nuove partizioni individuate durante una query inizieranno al più presto. Valore predefinito: Nessuno |
startingOffsetsByTimestampStrategy Tipo: Tipo di query: String streaming e batchQuesta strategia viene usata quando l'offset iniziale specificato dal timestamp, sia a livello globale che per partition, non corrisponde a quello restituito da Kafka per offset. Le strategie disponibili sono: - "error" : non è possibile eseguire la query- "latest" : assegna le offset più recenti a queste partizioni affinché Spark possa leggere i record più nuovi da esse nei micro-batch successivi.Valore predefinito: "error" |
startingTimestamp Tipo: Tipo di query: String streaming e batchValore stringa del timestamp espresso in millisecondi da 1970-01-01 00:00:00 UTC , ad esempio "1686444353000" . Per informazioni dettagliate sul comportamento con timestamp, vedere la nota seguente . Nel caso in cui Kafka non restituisca il offsetcorrispondente, il comportamento seguirà il valore dell'opzione startingOffsetsByTimestampStrategy .startingTimestamp ha la precedenza su startingOffsetsByTimestamp e startingOffsets .Nota: per le query di streaming, questa operazione si applica solo all'avvio di una nuova query. Le query di streaming riavviate continueranno dagli offset definiti nel checkpoint della query. Le nuove partizioni individuate durante una query inizieranno prima. Valore predefinito: Nessuno |
Nota
Il offset restituito per ogni partition è il primo offset il cui timestamp è maggiore o uguale al timestamp specificato nel partitioncorrispondente. Il comportamento varia tra le opzioni se Kafka non restituisce il offset abbinato; in tal caso, controllare la descrizione di ogni opzione.
Spark passa semplicemente le informazioni sul timestamp a KafkaConsumer.offsetsForTimes
e non interpreta o ragiona sul valore. Per altri dettagli su KafkaConsumer.offsetsForTimes
, vedere la documentazione. Inoltre, il significato del timestamp qui può variare in base alla configurazione Kafka (log.message.timestamp.type
). Per informazioni dettagliate, vedere la documentazione di Apache Kafka.