Partager via


read_kafka fonction table

S’applique à : coche marquée oui Databricks SQL case marquée oui Databricks Runtime 13.3 LTS et versions ultérieures

Lit les données d’un cluster Apache Kafka et retourne les données sous forme tabulaire.

Peut lire des données d’une ou plusieurs rubriques Kafka. Il prend en charge à la fois les requêtes par lots et l’ingestion en diffusion en continu.

Syntaxe

read_kafka([option_key => option_value ] [, ...])

Arguments

Cette fonction nécessite un appel de paramètre nommé.

  • option_key : le nom de l’option à configurer. Vous devez utiliser des accents graves (`) pour les options qui contiennent des points (.).
  • option_value : une expression constante pour définir l’option. Accepte les fonctions littérales et scalaires.

Retours

Enregistrements lus à partir d’un cluster Apache Kafka avec le schéma suivant :

  • key BINARY : la clé de l’enregistrement Kafka.
  • value BINARY NOT NULL : la valeur de l’enregistrement Kafka.
  • topic STRING NOT NULL : le nom de la rubrique Kafka à partir de laquelle l’enregistrement est lu.
  • partition INT NOT NULL : l’ID de la partition Kafka à partir de laquelle l’enregistrement est lu.
  • offset BIGINT NOT NULL : le numéro de décalage de l’enregistrement dans TopicPartition Kafka.
  • timestamp TIMESTAMP NOT NULL : la valeur de temps pour l’enregistrement. La colonne timestampType définit à quoi correspond ce timestamp.
  • timestampType INTEGER NOT NULL : le type de timestamp spécifié dans la colonne timestamp.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>> : valeurs d’en-tête fournies dans le cadre de l’enregistrement (si cette option est activée).

Exemples

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Options

Vous trouverez une liste détaillée des options dans la documentation Apache Spark.

Options requises

Fournissez l’option ci-dessous pour vous connecter à votre cluster Kafka.

Option
bootstrapServers

Entrez : String

Liste séparée par des virgules de paires hôte/port pointant vers le cluster Kafka.

Valeur par défaut : Aucun

Fournissez une seule des options ci-dessous pour configurer les rubriques Kafka à partir desquelles extraire des données.

Option
assign

Entrez : String

Chaîne JSON qui contient les partitions de rubrique spécifiques à utiliser. Par exemple, pour '{"topicA":[0,1],"topicB":[2,4]}', les partitions 0 et 1 de la rubriqueA seront utilisées.

Valeur par défaut : aucune
subscribe

Entrez : String

Liste séparée par des virgules des rubriques Kafka à lire.

Valeur par défaut : aucune
subscribePattern

Entrez : String

Expression régulière correspondant aux rubriques auxquelles s’abonner.

Valeur par défaut : Aucun

Options diverses

read_kafka peut être utilisé dans les requêtes par lots et dans les requêtes de diffusion en continu. Les options ci-dessous spécifient le type de requête auquel elles s’appliquent.

Option
endingOffsets

Type : Type de requête String : lot uniquement

Décalages à lire pour une requête par lots, soit "latest" pour spécifier les enregistrements les plus récents, soit une chaîne JSON spécifiant un décalage de fin pour chaque TopicPartition. Dans JSON, -1 peut être utilisé pour indiquer le décalage le plus récent. -2 (le plus ancien) car un décalage n’est pas autorisé.

Valeur par défaut : "latest"
endingOffsetsByTimestamp

Type : Type de requête String : lot uniquement

Chaîne JSON spécifiant un timestamp de fin à lire pour chaque TopicPartition. Les timestamps doivent être fournis en tant que valeur longue du timestamp en millisecondes depuis 1970-01-01 00:00:00 UTC, par exemple
1686444353000. Consultez la note ci-dessous pour plus d’informations sur le comportement avec les timestamps.
endingOffsetsByTimestamp est prioritaire par rapport à endingOffsets.

Valeur par défaut : Aucun
endingTimestamp

Type : Type de requête String : lot uniquement

Valeur de chaîne du timestamp en millisecondes depuis
1970-01-01 00:00:00 UTC, par exemple "1686444353000". Si Kafka ne retourne pas le décalage correspondant, le décalage est défini sur le plus récent. Consultez la note ci-dessous pour plus d’informations sur le comportement avec les timestamps. Note : endingTimestamp est prioritaire sur endingOffsetsByTimestamp et
endingOffsets.

Valeur par défaut : aucune
includeHeaders

Type : Type de requête Boolean : diffusion en continu et lot

Indique s’il faut inclure les en-têtes Kafka dans la ligne.

Valeur par défaut : false
kafka.<consumer_option>

Type : Type de requête String : diffusion en continu et lot

Toutes les options spécifiques au consommateur Kafka peuvent être transmises avec le préfixe kafka.. Ces options doivent être entourées d’accents graves lorsqu’elles sont fournies. Sinon, vous obtiendrez une erreur d’analyseur. Vous trouverez les options dans la documentation Kafka.

Note : vous ne devez pas définir les options suivantes avec cette fonction :
key.deserializer, value.deserializer, bootstrap.servers, group.id

Valeur par défaut : Aucun
maxOffsetsPerTrigger

Type : Type de requête Long : diffusion en continu uniquement

Limite de débit sur le nombre maximal de décalages ou lignes traités par intervalle de déclencheur. Le nombre total de décalages spécifié sera réparti de manière proportionnelle entre TopicPartitions.

Valeur par défaut : Aucun
startingOffsets

Type : Type de requête String : diffusion en continu et lot

Point de départ lors du démarrage d’une requête, soit "earliest" qui correspond aux décalages les plus anciens, "latest" qui correspond aux décalages les plus récents, soit une chaîne JSON spécifiant un décalage de départ pour chaque TopicPartition. Dans JSON, -2 peut être utilisé pour indiquer le décalage le plus ancien et -1 le plus récent.

Note : pour les requêtes par lot, le décalage le plus récent (implicitement ou à l’aide de -1 dans JSON) n’est pas autorisé. Pour les requêtes de diffusion en continu, cela s’applique uniquement lorsqu’une nouvelle requête est démarrée. Les requêtes de diffusion en continu redémarrées continuent à partir des décalages définis dans le point de contrôle de la requête. Les partitions nouvellement découvertes au cours d’une requête démarreront au plus tôt.

Valeur par défaut : "latest" pour la diffusion en continu, "earliest" pour le lot
startingOffsetsByTimestamp

Type : Type de requête String : diffusion en continu et lot

Chaîne JSON spécifiant un timestamp de départ pour chaque TopicPartition. Les timestamps doivent être fournis en tant que valeur longue du timestamp en millisecondes depuis 1970-01-01 00:00:00 UTC, par exemple 1686444353000. Consultez la note ci-dessous pour plus d’informations sur le comportement avec les timestamps. Si Kafka ne retourne pas le décalage correspondant, le comportement suit la valeur de l’option startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp est prioritaire par rapport à startingOffsets.

Note : Pour les requêtes de diffusion en continu, cela s’applique uniquement lorsqu’une nouvelle requête est démarrée. Les requêtes de diffusion en continu redémarrées continuent à partir des décalages définis dans le point de contrôle de la requête. Les partitions nouvellement découvertes au cours d’une requête démarreront au plus tôt.

Valeur par défaut : Aucun
startingOffsetsByTimestampStrategy

Type : Type de requête String : diffusion en continu et lot

Cette stratégie est utilisée lorsque le décalage de départ spécifié par timestamp (global ou par partition) ne correspond pas au décalage Kafka retourné. Les stratégies disponibles sont les suivantes :

- "error" : échec de la requête
- "latest" : attribue le décalage le plus récent pour ces partitions afin que Spark puisse lire les enregistrements plus récents à partir de ces partitions dans des micro-lots ultérieurs.

Valeur par défaut : "error"
startingTimestamp

Type : Type de requête String : diffusion en continu et lot

Valeur de chaîne du timestamp en millisecondes depuis
1970-01-01 00:00:00 UTC, par exemple "1686444353000". Consultez la note ci-dessous pour plus d’informations sur le comportement avec les timestamps. Si Kafka ne retourne pas le décalage correspondant, le comportement suit la valeur de l’option startingOffsetsByTimestampStrategy.
startingTimestamp est prioritaire sur startingOffsetsByTimestamp et startingOffsets.

Note : Pour les requêtes de diffusion en continu, cela s’applique uniquement lorsqu’une nouvelle requête est démarrée. Les requêtes de diffusion en continu redémarrées continuent à partir des décalages définis dans le point de contrôle de la requête. Les partitions nouvellement découvertes au cours d’une requête démarreront au plus tôt.

Valeur par défaut : Aucun

Remarque

Le décalage retourné pour chaque partition est le décalage le plus ancien dont le timestamp est supérieur ou égal au timestamp donné dans la partition correspondante. Le comportement varie selon les options si Kafka ne retourne pas le décalage correspondant. Veuillez vérifier la description de chaque option.

Spark transmet simplement les informations du timestamp à KafkaConsumer.offsetsForTimes, et n’interprète pas ou ne raisonne pas sur la valeur. Pour en savoir plus sur KafkaConsumer.offsetsForTimes, consultez la documentation. En outre, la signification du timestamp ici peut varier en fonction de la configuration Kafka (log.message.timestamp.type). Pour plus d’informations, consultez la documentation Apache Kafka.