Condividi tramite


funzione a valore read_kafkatable

Si applica a:segno di spunta sì Databricks SQL segno di spunta sì Databricks Runtime 13.3 LTS e versioni successive

Legge i dati da un cluster Apache Kafka e restituisce i dati in formato tabulare.

Può leggere i dati da uno o più argomenti kafka. Supporta sia le query batch che l'inserimento in streaming.

Sintassi

read_kafka([option_key => option_value ] [, ...])

Argomenti

Questa funzione richiede la chiamata di parametri denominati.

  • option_key: nome dell'opzione da configurare. È necessario utilizzare i backtick (') per le opzioni che contengono punti (.).
  • option_value: Un'espressione costante per determinare l'opzione set. Accetta valori letterali e funzioni scalari.

Valori restituiti

I record letti da un cluster Apache Kafka con il schemaseguente:

  • key BINARY: chiave del record Kafka.
  • value BINARY NOT NULL: valore del record Kafka.
  • topic STRING NOT NULL: nome dell'argomento Kafka da cui viene letto il record.
  • partition INT NOT NULL: L'ID di Kafka partition da cui viene letta la registrazione.
  • offset BIGINT NOT NULL: il numero del record offset nel sistema TopicPartitionKafka.
  • timestamp TIMESTAMP NOT NULL: valore timestamp per il record. Il timestampTypecolumn definisce a cosa corrisponde questo timestamp.
  • timestampType INTEGER NOT NULL: tipo del timestamp specificato nel timestampcolumn.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: intestazione values fornita come parte del record (se abilitata).

Esempi

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Opzioni

È possibile trovare un list dettagliato delle opzioni nella documentazione di Apache Spark .

Opzioni obbligatorie

Fornire l'opzione seguente per la connessione al cluster Kafka.

Opzione
bootstrapServers

Tipo: String

Un elenco list delimitato da virgole di coppie di host/porta che puntano al cluster Kafka.

Valore predefinito: Nessuno

Fornire solo una delle opzioni seguenti per configurare gli argomenti Kafka da cui eseguire il pull dei dati.

Opzione
assign

Tipo: String

Stringa JSON che contiene le partizioni di argomento specifiche da utilizzare. Ad esempio, per '{"topicA":[0,1],"topicB":[2,4]}'le partizioni 0 e 1° di topicA verranno utilizzate.

Valore predefinito: Nessuno
subscribe

Tipo: String

Un list delimitato da virgole di argomenti Kafka da cui leggere.

Valore predefinito: Nessuno
subscribePattern

Tipo: String

Argomenti corrispondenti a un'espressione regolare a cui sottoscrivere.

Valore predefinito: Nessuno

Altre opzioni

read_kafka può essere usato nelle query batch e nelle query di streaming. Le opzioni seguenti specificano il tipo di query a cui si applicano.

Opzione
endingOffsets

Tipo: Tipo di query: String solo batch

Gli offset da leggere per una query batch fino a quando non si arriva a "latest" per specificare i record più recenti, o una stringa JSON che specifica un offset di chiusura per ogni TopicPartition. In JSON -1 come offset può essere usato per fare riferimento alla versione più recente. -2 (più antico) come offset non è consentito.

Valore predefinito: "latest"
endingOffsetsByTimestamp

Tipo: Tipo di query: String solo batch

Stringa JSON che specifica un timestamp finale da leggere fino a quando non per ogni TopicPartition. I timestamp devono essere forniti come valore lungo del timestamp in millisecondi, ad esempio ,1970-01-01 00:00:00 UTC
1686444353000. Per informazioni dettagliate sul comportamento con timestamp, vedere la nota seguente .
endingOffsetsByTimestamp ha la precedenza rispetto a endingOffsets.

Valore predefinito: Nessuno
endingTimestamp

Tipo: Tipo di query: String solo batch

Valore stringa del timestamp espresso in millisecondi da
1970-01-01 00:00:00 UTC, ad esempio "1686444353000". Se Kafka non restituisce l'offsetcorrispondente, il offset verrà set alla versione più recente. Per informazioni dettagliate sul comportamento con timestamp, vedere la nota seguente . Nota: endingTimestamp ha la precedenza su endingOffsetsByTimestamp e
endingOffsets.

Valore predefinito: Nessuno
includeHeaders

Tipo: Tipo di query: Boolean streaming e batch

Indica se includere le intestazioni Kafka nella riga.

Valore predefinito: false
kafka.<consumer_option>

Tipo: Tipo di query: String streaming e batch

Qualsiasi opzione specifica del consumer Kafka può essere passata con il kafka. prefisso . Queste opzioni devono essere racchiuse da backtick quando specificato; in caso contrario, si get un errore del parser. Le opzioni sono disponibili nella documentazione di Kafka.

Nota: non è consigliabile set le opzioni seguenti con questa funzione:
key.deserializer, value.deserializer, bootstrap.serversgroup.id

Valore predefinito: Nessuno
maxOffsetsPerTrigger

Tipo: Tipo di query: Long solo streaming

Valore limit del numero massimo di offset o righe elaborate per l'intervallo di trigger. Il numero totale specificato di offset verrà suddiviso proporzionalmente tra TopicPartitions.

Valore predefinito: Nessuno
startingOffsets

Tipo: Tipo di query: String streaming e batch

Quando si avvia una query, il punto di partenza è "earliest", che proviene dagli offset meno recenti, "latest" che proviene solo dagli offset più recenti, o una stringa JSON che specifica un offset iniziale per ogni TopicPartition. Nel codice JSON, -2 come offset può essere usato per fare riferimento alla versione più antica, -1 alla più recente.

Nota: per le query batch, la versione più recente (implicitamente o tramite -1 in JSON) non è consentita. Per le query di streaming, questo vale solo quando viene avviata una nuova query. Le query di streaming riavviate continueranno dagli offset definiti nel checkpoint della query. Le nuove partizioni individuate durante una query inizieranno al più presto.

Valore predefinito: "latest" per lo streaming, "earliest" per batch
startingOffsetsByTimestamp

Tipo: Tipo di query: String streaming e batch

Stringa JSON che specifica un timestamp iniziale per ogni TopicPartition. I timestamp devono essere specificati come valore lungo del timestamp in millisecondi, ad 1970-01-01 00:00:00 UTCesempio 1686444353000. Per informazioni dettagliate sul comportamento con timestamp, vedere la nota seguente . Se Kafka non restituisce il offsetcorrispondente, il comportamento seguirà il valore dell'opzione startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp ha la precedenza rispetto a startingOffsets.

Nota: per le query di streaming, questa operazione si applica solo all'avvio di una nuova query. Le query di streaming riavviate continueranno dagli offset definiti nel checkpoint della query. Le nuove partizioni individuate durante una query inizieranno al più presto.

Valore predefinito: Nessuno
startingOffsetsByTimestampStrategy

Tipo: Tipo di query: String streaming e batch

Questa strategia viene usata quando l'offset iniziale specificato dal timestamp, sia a livello globale che per partition, non corrisponde a quello restituito da Kafka per offset. Le strategie disponibili sono:

- "error": non è possibile eseguire la query
- "latest": assegna le offset più recenti a queste partizioni affinché Spark possa leggere i record più nuovi da esse nei micro-batch successivi.

Valore predefinito: "error"
startingTimestamp

Tipo: Tipo di query: String streaming e batch

Valore stringa del timestamp espresso in millisecondi da
1970-01-01 00:00:00 UTC, ad esempio "1686444353000". Per informazioni dettagliate sul comportamento con timestamp, vedere la nota seguente . Nel caso in cui Kafka non restituisca il offsetcorrispondente, il comportamento seguirà il valore dell'opzione startingOffsetsByTimestampStrategy.
startingTimestamp ha la precedenza su startingOffsetsByTimestamp e startingOffsets.

Nota: per le query di streaming, questa operazione si applica solo all'avvio di una nuova query. Le query di streaming riavviate continueranno dagli offset definiti nel checkpoint della query. Le nuove partizioni individuate durante una query inizieranno prima.

Valore predefinito: Nessuno

Nota

Il offset restituito per ogni partition è il primo offset il cui timestamp è maggiore o uguale al timestamp specificato nel partitioncorrispondente. Il comportamento varia tra le opzioni se Kafka non restituisce il offset abbinato; in tal caso, controllare la descrizione di ogni opzione.

Spark passa semplicemente le informazioni sul timestamp a KafkaConsumer.offsetsForTimese non interpreta o ragiona sul valore. Per altri dettagli su KafkaConsumer.offsetsForTimes, vedere la documentazione. Inoltre, il significato del timestamp qui può variare in base alla configurazione Kafka (log.message.timestamp.type). Per informazioni dettagliate, vedere la documentazione di Apache Kafka.