Dela via


read_kafka tabellvärdesfunktion

Gäller för:markerad ja Databricks SQL markerad ja Databricks Runtime 13.3 LTS och senare

Läser data från ett Apache Kafka-kluster och returnerar data i tabellformat.

Kan läsa data från ett eller flera Kafka-ämnen. Den stöder både batchfrågor och strömmande inmatning.

Syntax

read_kafka([option_key => option_value ] [, ...])

Argumenten

Den här funktionen kräver namngivna parameteranrop.

  • option_key: Namnet på alternativet som ska konfigureras. Du måste använda bakåtvända apostrofer () for options that contain dots (.`).
  • option_value: Ett konstant uttryck för att ange alternativet. Accepterar literaler och skalärfunktioner.

Returer

Poster som lästs från ett Apache Kafka-kluster med följande schema:

  • key BINARY: Nyckeln till Kafka-meddelandet.
  • value BINARY NOT NULL: Värdet för Kafka-meddelandet.
  • topic STRING NOT NULL: Namnet på Kafka-ämnet som posten läses från.
  • partition INT NOT NULL: ID för Kafka-partitionen som meddelandet läses från.
  • offset BIGINT NOT NULL: Offsetnumret för posten i Kafka TopicPartition.
  • timestamp TIMESTAMP NOT NULL: Tidsstämpelvärdet för posten. Kolumnen timestampType definierar vad den här tidsstämpeln motsvarar.
  • timestampType INTEGER NOT NULL: Typen av tidsstämpel som anges i kolumnen timestamp.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: Rubrikvärden som anges som en del av posten (om det är aktiverat).

Exempel

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Alternativ

Du hittar en detaljerad lista över alternativ i Apache Spark-dokumentationen.

Obligatoriska alternativ

Ange alternativet nedan för att ansluta till ditt Kafka-kluster.

Alternativ
bootstrapServers
Typ: String
En kommaavgränsad lista över värd-/portpar som pekar på Kafka-kluster.
Standardvärde: Ingen

Ange endast ett av alternativen nedan för att konfigurera vilka Kafka-ämnen som du vill hämta data från.

Alternativ
assign
Typ: String
En JSON-sträng som innehåller de specifika ämnespartitioner som ska användas från. Till exempel, för '{"topicA":[0,1],"topicB":[2,4]}', kommer topicA:s 0:e och 1:a partitioner att hämtas från.
Standardvärde: Ingen
subscribe
Typ: String
En kommaavgränsad lista över Kafka-ämnen att läsa från.
Standardvärde: Ingen
subscribePattern
Typ: String
Ett reguljärt uttryck som matchar ämnen att prenumerera på.
Standardvärde: Ingen

Diverse alternativ

read_kafka kan användas i batchfrågor och i strömmande frågor. Alternativen nedan anger vilken typ av fråga de gäller för.

Alternativ
endingOffsets
Typ: String Frågetyp: endast batch
Förskjutningarna som används för att läsa fram till en viss punkt för en satsvis fråga, antingen "latest" för att ange de senaste posterna eller en JSON-sträng som anger en slutförskjutning för varje TopicPartition. I JSON-dokumentet kan -1 som offset användas för att referera till den senaste versionen. -2 (tidigast) tillåts inte som förskjutning.
Standardvärde: "latest"
endingOffsetsByTimestamp
Typ: String Frågetyp: endast batch
En JSON-sträng som anger en sluttidsstämpel för läsning fram till för varje TopicPartition. Tidsstämplarna måste anges som ett långt värde för tidsstämpeln i millisekunder sedan 1970-01-01 00:00:00 UTC, till exempel
1686444353000. Se kommentaren nedan om information om beteende med tidsstämplar.
endingOffsetsByTimestamp har företräde framför endingOffsets.
Standardvärde: Ingen
endingTimestamp
Typ: String Frågetyp: endast batch-bearbetning
Ett strängvärde för tidsstämpeln i millisekunder sedan
1970-01-01 00:00:00 UTC, till exempel "1686444353000". Om Kafka inte returnerar den matchade offseten kommer offseten att ställas in på det senaste värdet. Se kommentaren nedan om information om beteende med tidsstämplar. Obs! endingTimestamp har företräde framför endingOffsetsByTimestamp och
endingOffsets.
Standardvärde: Ingen
includeHeaders
Typ: Boolean Frågetyp: direktuppspelning och batch
Om Kafka-rubrikerna ska tas med i raden.
Standardvärde: false
kafka.<consumer_option>
Typ: String Frågetyp: direktuppspelning och batch
Alla Kafka-konsumentspecifika alternativ kan skickas med prefixet kafka. . Dessa alternativ måste omges av backticks när de tillhandahålls, annars får du ett parser-fel. Du hittar alternativen i Kafka-dokumentationen.
Obs! Du bör inte ange följande alternativ med den här funktionen:
key.deserializer, value.deserializer, , bootstrap.serversgroup.id
Standardvärde: Ingen
maxOffsetsPerTrigger
Typ: Long Frågetyp: endast direktuppspelning
Hastighetsgräns för det maximala antalet offset eller rader som hanteras per triggningsintervall. Det angivna totala antalet förskjutningar delas proportionellt mellan TopicPartitions.
Standardvärde: Ingen
startingOffsets
Typ: String Frågetyp: direktuppspelning och batch
Startpunkten när en fråga startas, antingen "earliest" som är från de tidigaste förskjutningarna, "latest" som bara är från de senaste förskjutningarna eller en JSON-sträng som anger en startförskjutning för varje TopicPartition. I JSON kan -2 användas som ett offset för att referera till den tidigaste, -1 till den senaste.
Obs! För batchfrågor tillåts inte den senaste (antingen implicit eller med hjälp av -1 i JSON). För direktuppspelningsfrågor gäller detta bara när en ny fråga startas. Omstartade strömningsfrågor fortsätter från de offseter som definierats i frågekontrollpunkten. Nyligen identifierade partitioner under en fråga startar tidigast.
Standardvärde: "latest" för direktuppspelning, "earliest" för batch
startingOffsetsByTimestamp
Typ: String Frågetyp: direktuppspelning och batch
En JSON-sträng som anger en starttidsstämpel för varje TopicPartition. Tidsstämplarna måste anges som ett långt värde för tidsstämpeln i millisekunder sedan 1970-01-01 00:00:00 UTC, till exempel 1686444353000. Se kommentaren nedan om information om beteende med tidsstämplar. Om Kafka inte returnerar den matchade offseten kommer beteendet att följa värdet för alternativet startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp har företräde framför startingOffsets.
Obs! För strömmande frågor gäller detta bara när en ny fråga startas. Omstartade strömmande frågor fortsätter från de startpositioner som definierats i frågekontrollpunkten. Nyligen identifierade partitioner under en fråga startar tidigast.
Standardvärde: Ingen
startingOffsetsByTimestampStrategy
Typ: String Frågetyp: direktuppspelning och batch
Den här strategin används när den angivna startförskjutningen av tidsstämpeln (antingen global eller per partition) inte överensstämmer med förskjutningen som Kafka returnerade. De tillgängliga strategierna är:
  • "error": misslyckas med frågan
  • "latest": tilldelar den senaste offseten för dessa partitioner för att Spark ska kunna läsa de senaste posterna från dessa partitioner i senare mikrobatcher.

Standardvärde: "error"
startingTimestamp
Typ: String Frågetyp: direktuppspelning och batch
Ett strängvärde för tidsstämpeln i millisekunder sedan
1970-01-01 00:00:00 UTC, till exempel "1686444353000". Se kommentaren nedan om information om beteende med tidsstämplar. Om Kafka inte returnerar den matchade förskjutningen, kommer beteendet att följa alternativets värde startingOffsetsByTimestampStrategy.
startingTimestamp har företräde framför startingOffsetsByTimestamp och startingOffsets.
Obs! För strömmande frågor gäller detta bara när en ny fråga startas. Omstartade direktuppspelningsfrågor fortsätter från de förskjutningar som definierats i frågekontrollpunkten. Nyligen identifierade partitioner under en fråga startar tidigast.
Standardvärde: Ingen

Kommentar

Den returnerade förskjutningen för varje partition är den tidigaste förskjutningen vars tidsstämpel är större än eller lika med den angivna tidsstämpeln i motsvarande partition. Beteendet varierar mellan olika alternativ om Kafka inte returnerar den matchade förskjutningen – kontrollera beskrivningen av varje alternativ.

Spark skickar helt enkelt tidsstämpelinformationen till KafkaConsumer.offsetsForTimesoch tolkar inte eller resonerar inte om värdet. Mer information om KafkaConsumer.offsetsForTimesfinns i dokumentationen. Dessutom kan betydelsen av tidsstämpeln här variera beroende på Kafka-konfigurationen (log.message.timestamp.type). Mer information finns i Apache Kafka-dokumentationen.