Dela via


read_kafka table-värdefunktion

Gäller för:markerad ja Databricks SQL markerad ja Databricks Runtime 13.3 LTS och senare

Läser data från ett Apache Kafka-kluster och returnerar data i tabellformat.

Kan läsa data från ett eller flera Kafka-ämnen. Den stöder både batchfrågor och strömmande inmatning.

Syntax

read_kafka([option_key => option_value ] [, ...])

Argument

Den här funktionen kräver namngivna parameteranrop.

  • option_key: Namnet på alternativet som ska konfigureras. Du måste använda backticks (') för alternativ som innehåller punkter (.).
  • option_value: Ett konstant uttryck för att ställa in alternativet set. Accepterar literaler och skalärfunktioner.

Returer

Poster lästa från ett Apache Kafka-kluster med följande schema:

  • key BINARY: Nyckeln för Kafka-posten.
  • value BINARY NOT NULL: Värdet för Kafka-posten.
  • topic STRING NOT NULL: Namnet på kafka-ämnet som posten läse från.
  • partition INT NOT NULL: ID:t för Kafka-partition-posten från vilken posten läses.
  • offset BIGINT NOT NULL: Postens offset nummer i Kafka-TopicPartition.
  • timestamp TIMESTAMP NOT NULL: Ett tidsstämpelvärde för posten. timestampType column definierar vad den här tidsstämpeln motsvarar.
  • timestampType INTEGER NOT NULL: Typen av tidsstämpel som anges i timestampcolumn.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: Rubrik values finns med som en del av posten (om den är aktiverad).

Exempel

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Alternativ

Du hittar en detaljerad förteckning list över alternativ i Apache Spark-dokumentationen.

Obligatoriska alternativ

Ange alternativet nedan för att ansluta till ditt Kafka-kluster.

Alternativ
bootstrapServers

Typ: String

Ett kommaavgränsat list värd-/portpar som pekar mot Kafka-klustret.

Standardvärde: Ingen

Ange endast ett av alternativen nedan för att konfigurera vilka Kafka-ämnen som du vill hämta data från.

Alternativ
assign

Typ: String

En JSON-sträng som innehåller de specifika ämnespartitioner som ska användas från. Till exempel '{"topicA":[0,1],"topicB":[2,4]}'används topicA:s 0:e och 1:a partitioner från.

Standardvärde: Ingen
subscribe

Typ: String

Ett kommaavgränsat list kafka-ämnen att läsa från.

Standardvärde: Ingen
subscribePattern

Typ: String

Ett reguljärt uttryck som matchar ämnen att prenumerera på.

Standardvärde: Ingen

Diverse alternativ

read_kafka kan användas i batchfrågor och i strömmande frågor. Alternativen nedan anger vilken typ av fråga de gäller för.

Alternativ
endingOffsets

Typ: String Frågetyp: endast batch

Förskjutningar för att läsa tills för en batchfråga, antingen "latest" för att ange de senaste posterna eller använda en JSON-sträng som anger ett slut offset för varje TopicPartition. I JSON kan -1 som en offset användas för att hänvisa till den senaste. -2 (tidigast) som offset är inte tillåtet.

Standardvärde: "latest"
endingOffsetsByTimestamp

Typ: String Frågetyp: endast batch

En JSON-sträng som anger en sluttidsstämpel som ska läsas till för varje TopicPartition. Tidsstämplarna måste anges som ett långt värde för tidsstämpeln i millisekunder sedan 1970-01-01 00:00:00 UTC, till exempel
1686444353000. Se kommentaren nedan om information om beteende med tidsstämplar.
endingOffsetsByTimestamp har företräde framför endingOffsets.

Standardvärde: Ingen
endingTimestamp

Typ: String Frågetyp: endast batch

Ett strängvärde för tidsstämpeln i millisekunder sedan
1970-01-01 00:00:00 UTC, till exempel "1686444353000". Om Kafka inte returnerar den matchade offsetblir offsetset till den senaste. Se kommentaren nedan om information om beteende med tidsstämplar. Obs! endingTimestamp har företräde framför endingOffsetsByTimestamp och
endingOffsets.

Standardvärde: Ingen
includeHeaders

Typ: Boolean Frågetyp: direktuppspelning och batch

Om Kafka-rubrikerna ska inkluderas på raden.

Standardvärde: false
kafka.<consumer_option>

Typ: String Frågetyp: direktuppspelning och batch

Alla Kafka-konsumentspecifika alternativ kan skickas med prefixet kafka. . Dessa alternativ måste omges av backticks när de tillhandahålls, annars get ett parserfel. Du hittar alternativen i Kafka-dokumentationen.

Obs! Du bör inte set följande alternativ med den här funktionen:
key.deserializer, value.deserializer, , bootstrap.serversgroup.id

Standardvärde: Ingen
maxOffsetsPerTrigger

Typ: Long Frågetyp: endast direktuppspelning

Hastighet limit på det maximala antalet förskjutningar eller rader som bearbetas per utlösarintervall. Det angivna totala antalet förskjutningar delas proportionellt mellan TopicPartitions.

Standardvärde: Ingen
startingOffsets

Typ: String Frågetyp: direktuppspelning och batch

Startpunkten när en fråga startas, antingen "earliest" som är från de tidigaste förskjutningarna, "latest" som bara är från de senaste förskjutningarna eller en JSON-sträng som anger en start-offset för varje TopicPartition. I JSON-filer kan -2 som offset användas för att referera från tidigaste -1 till senaste.

Obs! För batchfrågor tillåts inte den senaste (antingen implicit eller med hjälp av -1 i JSON). För direktuppspelningsfrågor gäller detta bara när en ny fråga startas. Omstartade direktuppspelningsfrågor fortsätter från de förskjutningar som definierats i frågekontrollpunkten. Nyligen identifierade partitioner under en fråga startar tidigast.

Standardvärde: "latest" för direktuppspelning, "earliest" för batch
startingOffsetsByTimestamp

Typ: String Frågetyp: direktuppspelning och batch

En JSON-sträng som anger en starttidsstämpel för varje TopicPartition. Tidsstämplarna måste anges som ett långt värde för tidsstämpeln i millisekunder sedan 1970-01-01 00:00:00 UTC, till exempel 1686444353000. Se kommentaren nedan om information om beteende med tidsstämplar. Om Kafka inte returnerar den matchade offset, kommer beteendet att följa värdet för alternativet startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp har företräde framför startingOffsets.

Obs! För strömmande frågor gäller detta bara när en ny fråga startas. Omstartade direktuppspelningsfrågor fortsätter från de förskjutningar som definierats i frågekontrollpunkten. Nyligen identifierade partitioner under en fråga startar tidigast.

Standardvärde: Ingen
startingOffsetsByTimestampStrategy

Typ: String Frågetyp: direktuppspelning och batch

Den här strategin används när den angivna start offset av tidsstämpel (antingen global eller per partition) inte matchar med den offset Kafka returneras. De tillgängliga strategierna är:

- "error": misslyckas med frågan
- "latest": tilldelar den senaste offset för dessa partitioner så att Spark kan läsa nyare poster från dessa partitioner i senare mikrobatch.

Standardvärde: "error"
startingTimestamp

Typ: String Frågetyp: direktuppspelning och batch

Ett strängvärde för tidsstämpeln i millisekunder sedan
1970-01-01 00:00:00 UTC, till exempel "1686444353000". Se kommentaren nedan om information om beteende med tidsstämplar. Om Kafka inte returnerar den matchade offset, kommer beteendet följa värdet av alternativet startingOffsetsByTimestampStrategy.
startingTimestamp har företräde framför startingOffsetsByTimestamp och startingOffsets.

Obs! För strömmande frågor gäller detta bara när en ny fråga startas. Omstartade direktuppspelningsfrågor fortsätter från de förskjutningar som definierats i frågekontrollpunkten. Nyligen identifierade partitioner under en fråga startar tidigast.

Standardvärde: Ingen

Kommentar

Den returnerade offset för varje partition är den tidigaste offset vars tidsstämpel är större än eller lika med den angivna tidsstämpeln i motsvarande partition. Beteendet varierar mellan olika alternativ om Kafka inte returnerar den matchade offset – kontrollera beskrivningen av varje alternativ.

Spark skickar helt enkelt tidsstämpelinformationen till KafkaConsumer.offsetsForTimesoch tolkar inte eller resonerar inte om värdet. Mer information om KafkaConsumer.offsetsForTimesfinns i dokumentationen. Dessutom kan betydelsen av tidsstämpeln här variera beroende på Kafka-konfigurationen (log.message.timestamp.type). Mer information finns i Apache Kafka-dokumentationen.