read_kafka
fonction table
S’applique à : Databricks SQL Databricks Runtime 13.3 LTS et versions ultérieures
Lit les données d’un cluster Apache Kafka et retourne les données sous forme tabulaire.
Peut lire des données d’une ou plusieurs rubriques Kafka. Il prend en charge à la fois les requêtes par lots et l’ingestion en diffusion en continu.
Syntaxe
read_kafka([option_key => option_value ] [, ...])
Arguments
Cette fonction nécessite un appel de paramètre nommé.
option_key
: le nom de l’option à configurer. Vous devez utiliser des accents graves (`) pour les options qui contiennent des points (.
).option_value
: une expression constante pour définir l’option. Accepte les fonctions littérales et scalaires.
Retours
Enregistrements lus à partir d’un cluster Apache Kafka avec le schéma suivant :
key BINARY
: la clé de l’enregistrement Kafka.value BINARY NOT NULL
: la valeur de l’enregistrement Kafka.topic STRING NOT NULL
: le nom de la rubrique Kafka à partir de laquelle l’enregistrement est lu.partition INT NOT NULL
: l’ID de la partition Kafka à partir de laquelle l’enregistrement est lu.offset BIGINT NOT NULL
: le numéro de décalage de l’enregistrement dansTopicPartition
Kafka.timestamp TIMESTAMP NOT NULL
: la valeur de temps pour l’enregistrement. La colonnetimestampType
définit à quoi correspond ce timestamp.timestampType INTEGER NOT NULL
: le type de timestamp spécifié dans la colonnetimestamp
.headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: valeurs d’en-tête fournies dans le cadre de l’enregistrement (si cette option est activée).
Exemples
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Options
Vous trouverez une liste détaillée des options dans la documentation Apache Spark.
Options requises
Fournissez l’option ci-dessous pour vous connecter à votre cluster Kafka.
Option |
---|
bootstrapServers Entrez : String Liste séparée par des virgules de paires hôte/port pointant vers le cluster Kafka. Valeur par défaut : Aucun |
Fournissez une seule des options ci-dessous pour configurer les rubriques Kafka à partir desquelles extraire des données.
Option |
---|
assign Entrez : String Chaîne JSON qui contient les partitions de rubrique spécifiques à utiliser. Par exemple, pour '{"topicA":[0,1],"topicB":[2,4]}' , les partitions 0 et 1 de la rubriqueA seront utilisées.Valeur par défaut : aucune |
subscribe Entrez : String Liste séparée par des virgules des rubriques Kafka à lire. Valeur par défaut : aucune |
subscribePattern Entrez : String Expression régulière correspondant aux rubriques auxquelles s’abonner. Valeur par défaut : Aucun |
Options diverses
read_kafka
peut être utilisé dans les requêtes par lots et dans les requêtes de diffusion en continu. Les options ci-dessous spécifient le type de requête auquel elles s’appliquent.
Option |
---|
endingOffsets Type : Type de requête String : lot uniquementDécalages à lire pour une requête par lots, soit "latest" pour spécifier les enregistrements les plus récents, soit une chaîne JSON spécifiant un décalage de fin pour chaque TopicPartition. Dans JSON, -1 peut être utilisé pour indiquer le décalage le plus récent. -2 (le plus ancien) car un décalage n’est pas autorisé.Valeur par défaut : "latest" |
endingOffsetsByTimestamp Type : Type de requête String : lot uniquementChaîne JSON spécifiant un timestamp de fin à lire pour chaque TopicPartition. Les timestamps doivent être fournis en tant que valeur longue du timestamp en millisecondes depuis 1970-01-01 00:00:00 UTC , par exemple1686444353000 . Consultez la note ci-dessous pour plus d’informations sur le comportement avec les timestamps.endingOffsetsByTimestamp est prioritaire par rapport à endingOffsets .Valeur par défaut : Aucun |
endingTimestamp Type : Type de requête String : lot uniquementValeur de chaîne du timestamp en millisecondes depuis 1970-01-01 00:00:00 UTC , par exemple "1686444353000" . Si Kafka ne retourne pas le décalage correspondant, le décalage est défini sur le plus récent. Consultez la note ci-dessous pour plus d’informations sur le comportement avec les timestamps. Note : endingTimestamp est prioritaire sur endingOffsetsByTimestamp etendingOffsets .Valeur par défaut : aucune |
includeHeaders Type : Type de requête Boolean : diffusion en continu et lotIndique s’il faut inclure les en-têtes Kafka dans la ligne. Valeur par défaut : false |
kafka.<consumer_option> Type : Type de requête String : diffusion en continu et lotToutes les options spécifiques au consommateur Kafka peuvent être transmises avec le préfixe kafka. . Ces options doivent être entourées d’accents graves lorsqu’elles sont fournies. Sinon, vous obtiendrez une erreur d’analyseur. Vous trouverez les options dans la documentation Kafka.Note : vous ne devez pas définir les options suivantes avec cette fonction : key.deserializer , value.deserializer , bootstrap.servers , group.id Valeur par défaut : Aucun |
maxOffsetsPerTrigger Type : Type de requête Long : diffusion en continu uniquementLimite de débit sur le nombre maximal de décalages ou lignes traités par intervalle de déclencheur. Le nombre total de décalages spécifié sera réparti de manière proportionnelle entre TopicPartitions. Valeur par défaut : Aucun |
startingOffsets Type : Type de requête String : diffusion en continu et lotPoint de départ lors du démarrage d’une requête, soit "earliest" qui correspond aux décalages les plus anciens, "latest" qui correspond aux décalages les plus récents, soit une chaîne JSON spécifiant un décalage de départ pour chaque TopicPartition. Dans JSON, -2 peut être utilisé pour indiquer le décalage le plus ancien et -1 le plus récent.Note : pour les requêtes par lot, le décalage le plus récent (implicitement ou à l’aide de -1 dans JSON) n’est pas autorisé. Pour les requêtes de diffusion en continu, cela s’applique uniquement lorsqu’une nouvelle requête est démarrée. Les requêtes de diffusion en continu redémarrées continuent à partir des décalages définis dans le point de contrôle de la requête. Les partitions nouvellement découvertes au cours d’une requête démarreront au plus tôt. Valeur par défaut : "latest" pour la diffusion en continu, "earliest" pour le lot |
startingOffsetsByTimestamp Type : Type de requête String : diffusion en continu et lotChaîne JSON spécifiant un timestamp de départ pour chaque TopicPartition. Les timestamps doivent être fournis en tant que valeur longue du timestamp en millisecondes depuis 1970-01-01 00:00:00 UTC , par exemple 1686444353000 . Consultez la note ci-dessous pour plus d’informations sur le comportement avec les timestamps. Si Kafka ne retourne pas le décalage correspondant, le comportement suit la valeur de l’option startingOffsetsByTimestampStrategy .startingOffsetsByTimestamp est prioritaire par rapport à startingOffsets .Note : Pour les requêtes de diffusion en continu, cela s’applique uniquement lorsqu’une nouvelle requête est démarrée. Les requêtes de diffusion en continu redémarrées continuent à partir des décalages définis dans le point de contrôle de la requête. Les partitions nouvellement découvertes au cours d’une requête démarreront au plus tôt. Valeur par défaut : Aucun |
startingOffsetsByTimestampStrategy Type : Type de requête String : diffusion en continu et lotCette stratégie est utilisée lorsque le décalage de départ spécifié par timestamp (global ou par partition) ne correspond pas au décalage Kafka retourné. Les stratégies disponibles sont les suivantes : - "error" : échec de la requête- "latest" : attribue le décalage le plus récent pour ces partitions afin que Spark puisse lire les enregistrements plus récents à partir de ces partitions dans des micro-lots ultérieurs.Valeur par défaut : "error" |
startingTimestamp Type : Type de requête String : diffusion en continu et lotValeur de chaîne du timestamp en millisecondes depuis 1970-01-01 00:00:00 UTC , par exemple "1686444353000" . Consultez la note ci-dessous pour plus d’informations sur le comportement avec les timestamps. Si Kafka ne retourne pas le décalage correspondant, le comportement suit la valeur de l’option startingOffsetsByTimestampStrategy .startingTimestamp est prioritaire sur startingOffsetsByTimestamp et startingOffsets .Note : Pour les requêtes de diffusion en continu, cela s’applique uniquement lorsqu’une nouvelle requête est démarrée. Les requêtes de diffusion en continu redémarrées continuent à partir des décalages définis dans le point de contrôle de la requête. Les partitions nouvellement découvertes au cours d’une requête démarreront au plus tôt. Valeur par défaut : Aucun |
Remarque
Le décalage retourné pour chaque partition est le décalage le plus ancien dont le timestamp est supérieur ou égal au timestamp donné dans la partition correspondante. Le comportement varie selon les options si Kafka ne retourne pas le décalage correspondant. Veuillez vérifier la description de chaque option.
Spark transmet simplement les informations du timestamp à KafkaConsumer.offsetsForTimes
, et n’interprète pas ou ne raisonne pas sur la valeur. Pour en savoir plus sur KafkaConsumer.offsetsForTimes
, consultez la documentation. En outre, la signification du timestamp ici peut varier en fonction de la configuration Kafka (log.message.timestamp.type
). Pour plus d’informations, consultez la documentation Apache Kafka.