read_kafka
funzione con valori di tabella
Si applica a: Databricks SQL
Databricks Runtime 13.3 LTS e versioni successive
Legge i dati da un cluster Apache Kafka e restituisce i dati in formato tabulare.
Può leggere i dati da uno o più topic Kafka. Supporta sia le query batch che l'inserimento in streaming.
Sintassi
read_kafka([option_key => option_value ] [, ...])
Argomenti
Questa funzione richiede la chiamata di parametri denominati.
-
option_key
: nome dell'opzione da configurare. È necessario usare i backtick () for options that contain dots (
.`). -
option_value
: espressione costante per impostare l'opzione . Accetta valori letterali e funzioni scalari.
Restituzioni
I record letti da un cluster Apache Kafka con lo schema seguente:
-
key BINARY
: la chiave del record Kafka. -
value BINARY NOT NULL
: Il valore del record Kafka. -
topic STRING NOT NULL
: nome dell'argomento Kafka da cui viene letto il record. -
partition INT NOT NULL
: ID della partizione Kafka da cui viene letto il record. -
offset BIGINT NOT NULL
: il numero di offset del record inTopicPartition
Kafka. -
timestamp TIMESTAMP NOT NULL
: valore timestamp per il record. La colonnatimestampType
definisce a cosa corrisponde questo timestamp. -
timestampType INTEGER NOT NULL
: tipo del timestamp specificato nella colonnatimestamp
. -
headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: valori di intestazione forniti come parte del record (se abilitato).
Esempi
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Opzioni
È possibile trovare un elenco dettagliato delle opzioni nella documentazione di Apache Spark.
Opzioni obbligatorie
Fornire l'opzione seguente per la connessione al cluster Kafka.
Opzione |
---|
bootstrapServers Tipo: String Elenco delimitato da virgole di coppie host/porta che puntano al cluster Kafka. Valore predefinito: Nessuno |
Fornire solo una delle opzioni seguenti per configurare gli argomenti Kafka da cui eseguire il pull dei dati.
Opzione |
---|
assign Tipo: String Stringa JSON che contiene le partizioni di argomento specifiche da utilizzare. Ad esempio, per '{"topicA":[0,1],"topicB":[2,4]}' , verranno lette le partizioni 0 e 1 di topicA.Valore predefinito: Nessuno |
subscribe Tipo: String Elenco di temi Kafka separati da virgole da cui leggere. Valore predefinito: Nessuno |
subscribePattern Tipo: String Un'espressione regolare che corrisponde agli argomenti da sottoscrivere. Valore predefinito: Nessuno |
Altre opzioni
read_kafka
può essere usato nelle query batch e nelle query di streaming. Le opzioni seguenti specificano il tipo di query a cui si applicano.
Opzione |
---|
endingOffsets Tipo: String Tipo di query: solo batchGli offset da leggere fino al termine per una query batch, per specificare i record più recenti con "latest" , o una stringa JSON che specifica un offset finale per ogni TopicPartition. Nel codice JSON -1 come offset può essere usato per fare riferimento alla versione più recente.
-2 (più antico) come offset non è consentito.Valore predefinito: "latest" |
endingOffsetsByTimestamp Tipo: String Tipo di query: solo batchStringa JSON che specifica un timestamp finale fino a cui leggere per ogni TopicPartition. I timestamp devono essere forniti come valore di tipo lungo del timestamp in millisecondi a partire da 1970-01-01 00:00:00 UTC , ad esempio1686444353000 . Per informazioni dettagliate sul comportamento con timestamp, vedere la nota seguente .endingOffsetsByTimestamp ha la precedenza rispetto a endingOffsets .Valore predefinito: Nessuno |
endingTimestamp Tipo: String Tipo di query: solo batchValore stringa del timestamp espresso in millisecondi dall'inizio 1970-01-01 00:00:00 UTC , ad esempio "1686444353000" . Se Kafka non restituisce l'offset corrispondente, l'offset verrà impostato su latest. Per informazioni dettagliate sul comportamento con timestamp, vedere la nota seguente . Nota: endingTimestamp ha la precedenza su endingOffsetsByTimestamp eendingOffsets .Valore predefinito: Nessuno |
includeHeaders Tipo: Boolean Tipo di query: streaming e batchIndica se includere le intestazioni Kafka nella riga. Valore predefinito: false |
kafka.<consumer_option> Tipo: String Tipo di query: streaming e batch.Le opzioni specifiche del consumer Kafka possono essere passate con il prefisso kafka. . Queste opzioni devono essere racchiuse tra backticks quando fornite, altrimenti verrà visualizzato un errore del parser. Le opzioni sono disponibili nella documentazione di Kafka.Nota: non è consigliabile impostare le opzioni seguenti con questa funzione: key.deserializer , value.deserializer , bootstrap.servers group.id Valore predefinito: Nessuno |
maxOffsetsPerTrigger Tipo: Long Tipo di query: solo streamingLimite di velocità per il numero massimo di offset o righe elaborate per ciascun intervallo di attivazione. Il numero totale specificato di offset verrà suddiviso proporzionalmente tra TopicPartitions. Valore predefinito: Nessuno |
startingOffsets Tipo: String Tipo di query: streaming e batchPunto iniziale all'avvio di una query: "earliest" , che parte dagli offset meno recenti, "latest" , che parte solo dagli offset più recenti, oppure una stringa JSON che specifica un offset iniziale per ogni TopicPartition. Nel JSON, -2 come offset può essere usato per fare riferimento all'inizio, mentre -1 alla più recente.Nota: per le query batch, la versione più recente (implicitamente o tramite -1 in JSON) non è consentita. Per le query di streaming, questo vale solo quando viene avviata una nuova query. Le query di streaming che sono state riavviate continueranno dagli offset definiti nel checkpoint della query. Le nuove partizioni individuate durante una query inizieranno al più presto. Valore predefinito: "latest" per lo streaming, "earliest" per batch |
startingOffsetsByTimestamp Tipo: String Tipo di query: streaming e batchStringa JSON che specifica un timestamp iniziale per ogni TopicPartition. I timestamp devono essere specificati come valore lungo del timestamp in millisecondi, ad 1970-01-01 00:00:00 UTC esempio 1686444353000 . Per informazioni dettagliate sul comportamento con timestamp, vedere la nota seguente . Se Kafka non restituisce l'offset corrispondente, il comportamento sarà in accordo col valore dell'opzione startingOffsetsByTimestampStrategy .startingOffsetsByTimestamp ha la precedenza rispetto a startingOffsets .Nota: per le query di streaming, questa operazione si applica solo all'avvio di una nuova query. Le query di streaming riavviate riprenderanno dagli offset definiti nel checkpoint della query. Le nuove partizioni individuate durante una query inizieranno al più presto. Valore predefinito: Nessuno |
startingOffsetsByTimestampStrategy Tipo: String Tipo di query: streaming e batchQuesta strategia viene usata quando l'offset iniziale specificato tramite timestamp (sia globale o per partizione) non corrisponde all'offset restituito da Kafka. Le strategie disponibili sono:
Valore predefinito: "error" |
startingTimestamp Tipo: String Tipo di query: streaming e batchvalore in formato stringa del timestamp espresso in millisecondi a partire da 1970-01-01 00:00:00 UTC , ad esempio "1686444353000" . Per informazioni dettagliate sul comportamento con timestamp, vedere la nota seguente . Se Kafka non restituisce l'offset corrispondente, il comportamento sarà in accordo col valore dell'opzione startingOffsetsByTimestampStrategy .startingTimestamp ha la precedenza su startingOffsetsByTimestamp e startingOffsets .Nota: per le query di streaming, questa operazione si applica solo all'avvio di una nuova query. Le query di streaming riavviate continueranno dagli offset definiti nel checkpoint della query. Le nuove partizioni individuate durante una query inizieranno prima. Valore predefinito: Nessuno |
Nota
L'offset restituito per ogni partizione è l'offset più recente il cui timestamp è maggiore o uguale al timestamp fornito nella partizione corrispondente. Il comportamento varia a seconda delle opzioni se Kafka non restituisce l'offset corrispondente. Controllare la descrizione di ogni opzione.
Spark passa semplicemente le informazioni sul timestamp a KafkaConsumer.offsetsForTimes
e non interpreta o ragiona sul valore. Per altri dettagli su KafkaConsumer.offsetsForTimes
, vedere la documentazione. Inoltre, il significato del timestamp qui può variare in base alla configurazione Kafka (log.message.timestamp.type
). Per informazioni dettagliate, vedere la documentazione di Apache Kafka.