read_kafka
función con valores de tabla
Se aplica a: Databricks SQL Databricks Runtime 13.3 LTS y versiones posteriores
Lee datos de un clúster de Apache Kafka y los devuelve en formato tabular.
Puede leer datos de uno o varios temas de Kafka. Admite tanto las consultas por lotes como la ingesta de streaming.
Sintaxis
read_kafka([option_key => option_value ] [, ...])
Argumentos
Esta función requiere la invocación de parámetros con nombre.
option_key
: nombre de la opción que se va a configurar. Debe usar el acento grave (`) para las opciones que contienen puntos (.
).option_value
: expresión constante para establecer la opción. Acepta literales y funciones escalares.
Devoluciones
Registros leídos de un clúster de Apache Kafka con el siguiente esquema:
key BINARY
: clave del registro de Kafka.value BINARY NOT NULL
: valor del registro de Kafka.topic STRING NOT NULL
: nombre del tema de Kafka del que se lee el registro.partition INT NOT NULL
: identificador de la partición de Kafka de la que se lee el registro.offset BIGINT NOT NULL
: número de desplazamiento del registro de laTopicPartition
de Kafka.timestamp TIMESTAMP NOT NULL
: valor de marca de tiempo del registro. La columnatimestampType
define a qué corresponde esta marca de tiempo.timestampType INTEGER NOT NULL
: tipo de la marca de tiempo que se especifica en la columnatimestamp
.headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: valores de encabezado proporcionados como parte del registro (si está habilitado).
Ejemplos
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Opciones
Encontrará una lista detallada de opciones en la documentación de Apache Spark.
Opciones necesarias
Proporcione la siguiente opción para conectarse al clúster de Kafka.
Opción |
---|
bootstrapServers Tipo: String Lista separada por comas de pares host/puerto que apuntan al clúster de Kafka. Valor predeterminado: ninguno |
Proporcione solo una de las siguientes opciones para configurar de qué temas de Kafka deben extraerse datos.
Opción |
---|
assign Tipo: String Cadena JSON que contiene las particiones de temas específicas de las que se van a consumir. Por ejemplo, para '{"topicA":[0,1],"topicB":[2,4]}' , se consumirán de las particiones 0 y 1 de topicA.Valor predeterminado: ninguno |
subscribe Tipo: String Lista separada por comas de temas de Kafka para leer. Valor predeterminado: ninguno |
subscribePattern Tipo: String Expresión regular que coincide con los temas a los que va a suscribirse. Valor predeterminado: ninguno |
Otras opciones
read_kafka
se puede usar en consultas por lotes y en consultas de streaming. Las siguientes opciones especifican a qué tipo de consulta se aplican.
Opción |
---|
endingOffsets Tipo: String Tipo de consulta: solo loteDesplazamientos hasta los que se va a leer para una consulta por lotes, ya sea "latest" para especificar los registros más recientes o una cadena JSON que especifique un desplazamiento final para cada TopicPartition. En JSON, se puede usar -1 como desplazamiento para referirse al más reciente. No se permite -2 (el más antiguo) como desplazamiento.Valor predeterminado: "latest" |
endingOffsetsByTimestamp Tipo: String Tipo de consulta: solo loteCadena JSON que especifica una marca de tiempo final hasta la que se va a leer para cada TopicPartition. Las marcas de tiempo deben proporcionarse como un valor largo de la marca de tiempo en milisegundos desde 1970-01-01 00:00:00 UTC , por ejemplo,1686444353000 . Consulte la siguiente nota sobre los detalles del comportamiento con marcas de tiempo.endingOffsetsByTimestamp tiene prioridad sobre endingOffsets .Valor predeterminado: ninguno |
endingTimestamp Tipo: String Tipo de consulta: solo loteValor de cadena de la marca de tiempo en milisegundos desde 1970-01-01 00:00:00 UTC ; por ejemplo, "1686444353000" . Si Kafka no devuelve el desplazamiento coincidente, este se establecerá como el más reciente. Consulte la siguiente nota sobre los detalles del comportamiento con marcas de tiempo. Nota: endingTimestamp tiene prioridad sobre endingOffsetsByTimestamp yendingOffsets .Valor predeterminado: ninguno |
includeHeaders Tipo: Boolean Tipo de consulta: streaming y loteIndica si se van a incluir los encabezados de Kafka en la fila. Valor predeterminado: false |
kafka.<consumer_option> Tipo: String Tipo de consulta: streaming y loteLas opciones específicas de consumidor de Kafka se pueden pasar con el prefijo kafka. . Estas opciones deben especificarse con acentos graves; de lo contrario, recibirá un error del analizador. Encontrará las opciones en la documentación de Kafka.Nota: No debe establecer las siguientes opciones con esta función: key.deserializer , value.deserializer , bootstrap.servers , group.id Valor predeterminado: ninguno |
maxOffsetsPerTrigger Tipo: Long Tipo de consulta: solo streamingNúmero máximo de desplazamientos o filas que se procesan por intervalo del desencadenador. El número total especificado de desplazamientos se dividirá proporcionalmente entre las TopicPartitions. Valor predeterminado: ninguno |
startingOffsets Tipo: String Tipo de consulta: streaming y lotePunto inicial cuando se inicia una consulta, ya sea "earliest" , que procede de los desplazamientos más antiguos; "latest" , que procede de los desplazamientos más recientes; o bien una cadena JSON que especifique un desplazamiento inicial para cada TopicPartition. En JSON, se puede usar -2 como desplazamiento para referirse al más antiguo y -1 al más reciente.Nota: En el caso de las consultas por lotes, no se permite la más reciente (ya sea implícitamente o mediante el uso de -1 en JSON). Para las consultas de streaming, solo se aplica cuando se inicia una nueva consulta. Las consultas de streaming reiniciadas continuarán desde los desplazamientos definidos en el punto de control de consultas. Las particiones recién detectadas durante una consulta se iniciarán al principio. Valor predeterminado: "latest" para streaming y "earliest" para lote |
startingOffsetsByTimestamp Tipo: String Tipo de consulta: streaming y loteCadena JSON que especifica una marca de tiempo inicial para cada TopicPartition. Las marcas de tiempo deben proporcionarse como un valor largo de la marca de tiempo en milisegundos desde 1970-01-01 00:00:00 UTC ; por ejemplo, 1686444353000 . Consulte la siguiente nota sobre los detalles del comportamiento con marcas de tiempo. Si Kafka no devuelve el desplazamiento coincidente, el comportamiento seguirá al valor de la opción startingOffsetsByTimestampStrategy .startingOffsetsByTimestamp tiene prioridad sobre startingOffsets .Nota: Para las consultas de streaming, solo se aplica cuando se inicia una nueva consulta. Las consultas de streaming reiniciadas continuarán desde los desplazamientos definidos en el punto de control de consultas. Las particiones recién detectadas durante una consulta se iniciarán al principio. Valor predeterminado: ninguno |
startingOffsetsByTimestampStrategy Tipo: String Tipo de consulta: streaming y loteEsta estrategia se usa cuando el desplazamiento inicial especificado por marca de tiempo (global o por partición) no coincide con el desplazamiento de Kafka que se ha devuelto. Las estrategias disponibles son las siguientes: - "error" : se produce un error en la consulta- "latest" : asigna el desplazamiento más reciente para estas particiones para que Spark pueda leer registros más recientes de estas particiones en microlotes posteriores.Valor predeterminado: "error" |
startingTimestamp Tipo: String Tipo de consulta: streaming y loteValor de cadena de la marca de tiempo en milisegundos desde 1970-01-01 00:00:00 UTC ; por ejemplo, "1686444353000" . Consulte la siguiente nota sobre los detalles del comportamiento con marcas de tiempo. Si Kafka no devuelve el desplazamiento coincidente, el comportamiento seguirá al valor de la opción startingOffsetsByTimestampStrategy .startingTimestamp tiene prioridad sobre startingOffsetsByTimestamp y startingOffsets .Nota: Para las consultas de streaming, solo se aplica cuando se inicia una nueva consulta. Las consultas de streaming reiniciadas continuarán desde los desplazamientos definidos en el punto de control de consultas. Las particiones recién detectadas durante una consulta comenzarán antes. Valor predeterminado: ninguno |
Nota:
El desplazamiento devuelto para cada partición es el desplazamiento más antiguo cuya marca de tiempo es igual o mayor que la marca de tiempo especificada en la partición correspondiente. El comportamiento varía entre las opciones si Kafka no devuelve el desplazamiento coincidente (consulte la descripción de cada opción).
Spark solo pasa la información de la marca de tiempo a KafkaConsumer.offsetsForTimes
, no interpreta ni da ningún motivo sobre el valor. Para obtener más información sobre KafkaConsumer.offsetsForTimes
, consulte la documentación. Además, el significado de la marca de tiempo de aquí puede variar según la configuración de Kafka (log.message.timestamp.type
). Para obtener más información, consulte la documentación de Apache Kafka.