read_kafka
functie met tabelwaarde
Van toepassing op: Databricks SQL
Databricks Runtime 13.3 LTS en hoger
Leest gegevens uit een Apache Kafka-cluster en retourneert de gegevens in tabelvorm.
Kan gegevens lezen uit een of meer Kafka-onderwerpen. Het ondersteunt zowel batchquery's als streaming-opname.
Syntaxis
read_kafka([option_key => option_value ] [, ...])
Argumenten
Voor deze functie is aanroepen van benoemde parameters vereist.
-
option_key
: De naam van de optie die u wilt configureren. U moet backticks () for options that contain dots (
.') gebruiken. -
option_value
: Een constante expressie om de optie in te stellen. Accepteert letterlijke en scalaire functies.
Retouren
Records die zijn gelezen uit een Apache Kafka-cluster met het volgende schema:
-
key BINARY
: De sleutel van het Kafka-record. -
value BINARY NOT NULL
: De waarde van de Kafka-record. -
topic STRING NOT NULL
: De naam van het Kafka-onderwerp waaruit de record is gelezen. -
partition INT NOT NULL
: De ID van de Kafka-partitie waaruit het record is gelezen. -
offset BIGINT NOT NULL
: Het offsetnummer van de record in KafkaTopicPartition
. -
timestamp TIMESTAMP NOT NULL
: Een tijdstempelwaarde voor de record. In detimestampType
kolom wordt gedefinieerd waar deze tijdstempel op betrekking heeft. -
timestampType INTEGER NOT NULL
: Het type tijdstempel dat is opgegeven in detimestamp
kolom. -
headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: Koptekstwaarden die zijn opgegeven als onderdeel van de record (indien ingeschakeld).
Voorbeelden
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Opties
U vindt een gedetailleerde lijst met opties in de Apache Spark-documentatie.
Vereiste opties
Geef de onderstaande optie op om verbinding te maken met uw Kafka-cluster.
Optie |
---|
bootstrapServers Type: String Een door komma's gescheiden lijst met host-/poortparen die verwijzen naar kafka-cluster. Standaardwaarde: Geen |
Geef slechts een van de onderstaande opties op om te configureren uit welke Kafka-onderwerpen gegevens moeten worden opgehaald.
Optie |
---|
assign Type: String Een JSON-tekenreeks die de specifieke topic-partities bevat waaruit moet worden geconsumeerd. Voor topicA worden bijvoorbeeld de 0e en 1e partitie verbruikt. Standaardwaarde: Geen |
subscribe Typ: String Een door komma's gescheiden lijst met Kafka-onderwerpen waaruit moet worden gelezen. Standaardwaarde: Geen |
subscribePattern Type: String Een reguliere expressie die overeenkomt met onderwerpen waarop u zich kunt abonneren. Standaardwaarde: Geen |
Diverse opties
read_kafka
kan worden gebruikt in batch- en streamingquery's. Met de onderstaande opties kunt u opgeven op welk type query ze van toepassing zijn.
Optie |
---|
endingOffsets Type: String Querytype: alleen batchDe offsets die moeten worden gelezen tot voor een batchquery, "latest" om de meest recente records op te geven, of een JSON-tekenreeks die een eindoffset voor elke TopicPartition specificeert. In de JSON kan -1 als een offset worden gebruikt om te verwijzen naar het nieuwste.
-2 (vroegst) als offset is niet toegestaan.Standaardwaarde: "latest" |
endingOffsetsByTimestamp Type: String Querytype: alleen batchEen JSON-tekenreeks die een eindtijdstempel specificeert om tot te lezen voor elke TopicPartition. De tijdstempels moeten worden opgegeven als een lange waarde van de tijdstempel in milliseconden sinds 1970-01-01 00:00:00 UTC , bijvoorbeeld.1686444353000 . Zie hieronder informatie over het gedrag met tijdstempels.endingOffsetsByTimestamp heeft voorrang op endingOffsets .Standaardwaarde: Geen |
endingTimestamp Type: String Querytype: alleen batchEen tekenreekswaarde van de tijdstempel in milliseconden sinds 1970-01-01 00:00:00 UTC bijvoorbeeld "1686444353000" . Als Kafka de overeenkomende offset niet retourneert, wordt de offset ingesteld op de meest recente. Zie hieronder informatie over het gedrag met tijdstempels. Opmerking: endingTimestamp heeft voorrang op endingOffsetsByTimestamp enendingOffsets .Standaardwaarde: Geen |
includeHeaders Type: Boolean Querytype: streaming en batchOf u de Kafka-headers in de rij wilt opnemen. Standaardwaarde: false |
kafka.<consumer_option> Type: String Querytype: streaming en batchAlle specifieke opties voor Kafka-consumenten kunnen worden doorgegeven met het kafka. voorvoegsel. Deze opties moeten worden omgeven door backticks wanneer deze worden opgegeven, anders krijgt u een parserfout. U vindt de opties in de Kafka-documentatie.Opmerking: U moet de volgende opties niet instellen met deze functie: key.deserializer , value.deserializer , bootstrap.servers , group.id Standaardwaarde: Geen |
maxOffsetsPerTrigger Type: Long Querytype: alleen streamingFrequentielimiet voor het maximum aantal offsets of rijen dat per triggerinterval wordt verwerkt. Het opgegeven totale aantal offsets wordt proportioneel verdeeld over TopicPartitions. Standaardwaarde: Geen |
startingOffsets Type: String Querytype: streaming en batchHet beginpunt waarop een query wordt gestart, "earliest" ofwel van de vroegste offsets, "latest" die alleen afkomstig is van de meest recente offsets, of een JSON-tekenreeks die een beginverschil voor elke TopicPartition opgeeft. In de JSON kan -2 als offset worden gebruikt om te verwijzen naar de vroegste, -1 naar de meest recente.Opmerking: voor batchquery's is de meest recente (impliciet of met behulp van -1 in JSON) niet toegestaan. Voor streamingquery's geldt dit alleen wanneer een nieuwe query wordt gestart. Opnieuw gestarte streamingquery's worden voortgezet vanaf de offsets die zijn gedefinieerd in het querycontrolepunt. Nieuw ontdekte partities starten tijdens een query op het vroegst mogelijke moment. Standaardwaarde: "latest" voor streaming, "earliest" voor batch |
startingOffsetsByTimestamp Type: String Querytype: streaming en batchEen JSON-tekenreeks die een begintijdstempel voor elke TopicPartition opgeeft. De tijdstempels moeten worden opgegeven als een lange waarde van de tijdstempel in milliseconden sinds 1970-01-01 00:00:00 UTC , bijvoorbeeld 1686444353000 . Zie hieronder informatie over het gedrag met tijdstempels. Als Kafka de overeenkomende offset niet retourneert, zal het gedrag afhankelijk zijn van de waarde van de optie startingOffsetsByTimestampStrategy .startingOffsetsByTimestamp heeft voorrang op startingOffsets .Opmerking: Voor streamingquery's geldt dit alleen wanneer een nieuwe query wordt gestart. Opnieuw gestarte streaming queries worden voortgezet vanaf de offsets die zijn gedefinieerd in het controlepunt van de query. Nieuwe gedetecteerde partities tijdens een query beginnen ten vroegste. Standaardwaarde: Geen |
startingOffsetsByTimestampStrategy Type: String Querytype: streaming en batchDeze strategie wordt gebruikt wanneer de opgegeven startoffset per tijdstempel (globaal of per partitie) niet overeenkomt met de offset die Kafka retourneerde. De beschikbare strategieƫn zijn:
Standaardwaarde: "error" |
startingTimestamp Type: String Query-type: streaming en batchEen tekenreekswaarde van de tijdstempel in milliseconden sinds 1970-01-01 00:00:00 UTC bijvoorbeeld "1686444353000" . Zie hieronder informatie over het gedrag met tijdstempels. Als Kafka de overeenkomende offset niet retourneert, zal het gedrag afhangen van de waarde van de optie startingOffsetsByTimestampStrategy .startingTimestamp heeft voorrang op startingOffsetsByTimestamp en startingOffsets .Opmerking: Voor streamingquery's geldt dit alleen wanneer een nieuwe query wordt gestart. Opnieuw gestarte streamingquery's worden voortgezet vanaf de offsets die zijn gedefinieerd in het querycontrolepunt. Zojuist gedetecteerde partities tijdens een query worden vroegst gestart. Standaardwaarde: Geen |
Notitie
De geretourneerde offset voor elke partitie is de vroegste offset waarvan de tijdstempel groter is dan of gelijk is aan de opgegeven tijdstempel in de bijbehorende partitie. Het gedrag verschilt per optie als Kafka de overeenkomende offset niet retourneert. Controleer de beschrijving van elke optie.
Spark geeft de tijdstempelgegevens gewoon door aan KafkaConsumer.offsetsForTimes
en interpreteert of redeneert niet over de waarde. Raadpleeg de documentatie voor meer details over KafkaConsumer.offsetsForTimes
. Ook kan de betekenis van tijdstempel hier variƫren afhankelijk van de Kafka-configuratie (log.message.timestamp.type
). Zie de Documentatie van Apache Kafka voor meer informatie.