Delen via


read_kafka functie met tabelwaarde

Van toepassing op:vinkje als ja aan Databricks SQL vinkje als ja aan Databricks Runtime 13.3 LTS en hoger

Leest gegevens uit een Apache Kafka-cluster en retourneert de gegevens in tabelvorm.

Kan gegevens lezen uit een of meer Kafka-onderwerpen. Het ondersteunt zowel batchquery's als streaming-opname.

Syntaxis

read_kafka([option_key => option_value ] [, ...])

Argumenten

Voor deze functie is aanroepen van benoemde parameters vereist.

  • option_key: De naam van de optie die u wilt configureren. U moet backticks () for options that contain dots (.') gebruiken.
  • option_value: Een constante expressie om de optie in te stellen. Accepteert letterlijke en scalaire functies.

Retouren

Records die zijn gelezen uit een Apache Kafka-cluster met het volgende schema:

  • key BINARY: De sleutel van het Kafka-record.
  • value BINARY NOT NULL: De waarde van de Kafka-record.
  • topic STRING NOT NULL: De naam van het Kafka-onderwerp waaruit de record is gelezen.
  • partition INT NOT NULL: De ID van de Kafka-partitie waaruit het record is gelezen.
  • offset BIGINT NOT NULL: Het offsetnummer van de record in Kafka TopicPartition.
  • timestamp TIMESTAMP NOT NULL: Een tijdstempelwaarde voor de record. In de timestampType kolom wordt gedefinieerd waar deze tijdstempel op betrekking heeft.
  • timestampType INTEGER NOT NULL: Het type tijdstempel dat is opgegeven in de timestamp kolom.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: Koptekstwaarden die zijn opgegeven als onderdeel van de record (indien ingeschakeld).

Voorbeelden

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Opties

U vindt een gedetailleerde lijst met opties in de Apache Spark-documentatie.

Vereiste opties

Geef de onderstaande optie op om verbinding te maken met uw Kafka-cluster.

Optie
bootstrapServers
Type: String
Een door komma's gescheiden lijst met host-/poortparen die verwijzen naar kafka-cluster.
Standaardwaarde: Geen

Geef slechts een van de onderstaande opties op om te configureren uit welke Kafka-onderwerpen gegevens moeten worden opgehaald.

Optie
assign
Type: String
Een JSON-tekenreeks die de specifieke topic-partities bevat waaruit moet worden geconsumeerd. Voor topicA worden bijvoorbeeld de 0e en 1e partitie verbruikt.
Standaardwaarde: Geen
subscribe
Typ: String
Een door komma's gescheiden lijst met Kafka-onderwerpen waaruit moet worden gelezen.
Standaardwaarde: Geen
subscribePattern
Type: String
Een reguliere expressie die overeenkomt met onderwerpen waarop u zich kunt abonneren.
Standaardwaarde: Geen

Diverse opties

read_kafka kan worden gebruikt in batch- en streamingquery's. Met de onderstaande opties kunt u opgeven op welk type query ze van toepassing zijn.

Optie
endingOffsets
Type: String Querytype: alleen batch
De offsets die moeten worden gelezen tot voor een batchquery, "latest" om de meest recente records op te geven, of een JSON-tekenreeks die een eindoffset voor elke TopicPartition specificeert. In de JSON kan -1 als een offset worden gebruikt om te verwijzen naar het nieuwste. -2 (vroegst) als offset is niet toegestaan.
Standaardwaarde: "latest"
endingOffsetsByTimestamp
Type: String Querytype: alleen batch
Een JSON-tekenreeks die een eindtijdstempel specificeert om tot te lezen voor elke TopicPartition. De tijdstempels moeten worden opgegeven als een lange waarde van de tijdstempel in milliseconden sinds 1970-01-01 00:00:00 UTC, bijvoorbeeld.
1686444353000. Zie hieronder informatie over het gedrag met tijdstempels.
endingOffsetsByTimestamp heeft voorrang op endingOffsets.
Standaardwaarde: Geen
endingTimestamp
Type: String Querytype: alleen batch
Een tekenreekswaarde van de tijdstempel in milliseconden sinds
1970-01-01 00:00:00 UTCbijvoorbeeld "1686444353000". Als Kafka de overeenkomende offset niet retourneert, wordt de offset ingesteld op de meest recente. Zie hieronder informatie over het gedrag met tijdstempels. Opmerking: endingTimestamp heeft voorrang op endingOffsetsByTimestamp en
endingOffsets.
Standaardwaarde: Geen
includeHeaders
Type: Boolean Querytype: streaming en batch
Of u de Kafka-headers in de rij wilt opnemen.
Standaardwaarde: false
kafka.<consumer_option>
Type: String Querytype: streaming en batch
Alle specifieke opties voor Kafka-consumenten kunnen worden doorgegeven met het kafka. voorvoegsel. Deze opties moeten worden omgeven door backticks wanneer deze worden opgegeven, anders krijgt u een parserfout. U vindt de opties in de Kafka-documentatie.
Opmerking: U moet de volgende opties niet instellen met deze functie:
key.deserializer, value.deserializer, bootstrap.servers, group.id
Standaardwaarde: Geen
maxOffsetsPerTrigger
Type: Long Querytype: alleen streaming
Frequentielimiet voor het maximum aantal offsets of rijen dat per triggerinterval wordt verwerkt. Het opgegeven totale aantal offsets wordt proportioneel verdeeld over TopicPartitions.
Standaardwaarde: Geen
startingOffsets
Type: String Querytype: streaming en batch
Het beginpunt waarop een query wordt gestart, "earliest" ofwel van de vroegste offsets, "latest" die alleen afkomstig is van de meest recente offsets, of een JSON-tekenreeks die een beginverschil voor elke TopicPartition opgeeft. In de JSON kan -2 als offset worden gebruikt om te verwijzen naar de vroegste, -1 naar de meest recente.
Opmerking: voor batchquery's is de meest recente (impliciet of met behulp van -1 in JSON) niet toegestaan. Voor streamingquery's geldt dit alleen wanneer een nieuwe query wordt gestart. Opnieuw gestarte streamingquery's worden voortgezet vanaf de offsets die zijn gedefinieerd in het querycontrolepunt. Nieuw ontdekte partities starten tijdens een query op het vroegst mogelijke moment.
Standaardwaarde: "latest" voor streaming, "earliest" voor batch
startingOffsetsByTimestamp
Type: String Querytype: streaming en batch
Een JSON-tekenreeks die een begintijdstempel voor elke TopicPartition opgeeft. De tijdstempels moeten worden opgegeven als een lange waarde van de tijdstempel in milliseconden sinds 1970-01-01 00:00:00 UTC, bijvoorbeeld 1686444353000. Zie hieronder informatie over het gedrag met tijdstempels. Als Kafka de overeenkomende offset niet retourneert, zal het gedrag afhankelijk zijn van de waarde van de optie startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp heeft voorrang op startingOffsets.
Opmerking: Voor streamingquery's geldt dit alleen wanneer een nieuwe query wordt gestart. Opnieuw gestarte streaming queries worden voortgezet vanaf de offsets die zijn gedefinieerd in het controlepunt van de query. Nieuwe gedetecteerde partities tijdens een query beginnen ten vroegste.
Standaardwaarde: Geen
startingOffsetsByTimestampStrategy
Type: String Querytype: streaming en batch
Deze strategie wordt gebruikt wanneer de opgegeven startoffset per tijdstempel (globaal of per partitie) niet overeenkomt met de offset die Kafka retourneerde. De beschikbare strategieƫn zijn:
  • "error": laat de query mislukken
  • "latest": wijst de meest recente offset voor deze partities toe, zodat Spark nieuwere records van deze partities in latere microbatches kan lezen.

Standaardwaarde: "error"
startingTimestamp
Type: String Query-type: streaming en batch
Een tekenreekswaarde van de tijdstempel in milliseconden sinds
1970-01-01 00:00:00 UTCbijvoorbeeld "1686444353000". Zie hieronder informatie over het gedrag met tijdstempels. Als Kafka de overeenkomende offset niet retourneert, zal het gedrag afhangen van de waarde van de optie startingOffsetsByTimestampStrategy.
startingTimestamp heeft voorrang op startingOffsetsByTimestamp en startingOffsets.
Opmerking: Voor streamingquery's geldt dit alleen wanneer een nieuwe query wordt gestart. Opnieuw gestarte streamingquery's worden voortgezet vanaf de offsets die zijn gedefinieerd in het querycontrolepunt. Zojuist gedetecteerde partities tijdens een query worden vroegst gestart.
Standaardwaarde: Geen

Notitie

De geretourneerde offset voor elke partitie is de vroegste offset waarvan de tijdstempel groter is dan of gelijk is aan de opgegeven tijdstempel in de bijbehorende partitie. Het gedrag verschilt per optie als Kafka de overeenkomende offset niet retourneert. Controleer de beschrijving van elke optie.

Spark geeft de tijdstempelgegevens gewoon door aan KafkaConsumer.offsetsForTimesen interpreteert of redeneert niet over de waarde. Raadpleeg de documentatie voor meer details over KafkaConsumer.offsetsForTimes. Ook kan de betekenis van tijdstempel hier variƫren afhankelijk van de Kafka-configuratie (log.message.timestamp.type). Zie de Documentatie van Apache Kafka voor meer informatie.