read_kafka
função com valor de tabela
Aplica-se a: Databricks SQL Databricks Runtime 13.3 LTS e superior
Lê dados de um cluster Apache Kafka e retorna os dados em forma de tabela.
Pode ler dados de um ou mais tópicos de Kafka. Ele suporta consultas em lote e ingestão de streaming.
Sintaxe
read_kafka([option_key => option_value ] [, ...])
Argumentos
Esta função requer invocação de parâmetro nomeado.
option_key
: O nome da opção a ser configurada. Você deve usar backticks (') para opções que contêm pontos (.
).option_value
: Uma expressão constante para definir a opção. Aceita literais e funções escalares.
Devoluções
Registros lidos de um cluster Apache Kafka com o seguinte esquema:
key BINARY
: A chave do disco de Kafka.value BINARY NOT NULL
: O valor do registro de Kafka.topic STRING NOT NULL
: O nome do tópico Kafka do qual o registro é lido.partition INT NOT NULL
: O ID da partição Kafka a partir da qual o registro é lido.offset BIGINT NOT NULL
: O número de deslocamento do registro no KafkaTopicPartition
.timestamp TIMESTAMP NOT NULL
: Um valor de carimbo de data/hora para o registro. AtimestampType
coluna define a que corresponde esse carimbo de data/hora.timestampType INTEGER NOT NULL
: O tipo do carimbo de data/hora especificado natimestamp
coluna.headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>
: Valores de cabeçalho fornecidos como parte do registro (se habilitado).
Exemplos
-- A batch query to read from a topic.
> SELECT value::string as value
FROM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
) LIMIT 10;
-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
SELECT
value::string:events, -- extract the field `events`
to_timestamp(value::string:ts) as ts -- extract the field `ts` and cast to timestamp
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'events'
);
Opções
Você pode encontrar uma lista detalhada de opções na documentação do Apache Spark.
Opções necessárias
Forneça a opção abaixo para se conectar ao seu cluster Kafka.
Opção |
---|
bootstrapServers Tipo: String Uma lista separada por vírgulas de pares host/porta apontando para o cluster Kafka. Valor padrão: Nenhum |
Forneça apenas uma das opções abaixo para configurar quais tópicos do Kafka extrair dados.
Opção |
---|
assign Tipo: String Uma cadeia de caracteres JSON que contém as partições de tópico específicas a serem consumidas. Por exemplo, para '{"topicA":[0,1],"topicB":[2,4]}' , as partições 0'th e 1st do topicA serão consumidas a partir de.Valor padrão: Nenhum |
subscribe Tipo: String Uma lista separada por vírgulas de tópicos de Kafka para ler. Valor padrão: Nenhum |
subscribePattern Tipo: String Uma expressão regular que corresponde aos tópicos para assinar. Valor padrão: Nenhum |
Opções diversas
read_kafka
pode ser usado em consultas em lote e em consultas de streaming. As opções abaixo especificam a que tipo de consulta se aplicam.
Opção |
---|
endingOffsets Tipo: Tipo de consulta: String apenas loteOs deslocamentos a serem lidos até para uma consulta em lotes, seja "latest" para especificar os registros mais recentes ou uma cadeia de caracteres JSON especificando um deslocamento final para cada TopicPartition. No JSON, -1 como um deslocamento pode ser usado para se referir ao mais recente. -2 (mais cedo) como um deslocamento não é permitido.Valor predefinido: "latest" |
endingOffsetsByTimestamp Tipo: Tipo de consulta: String apenas loteUma cadeia de caracteres JSON especificando um carimbo de data/hora final para leitura até para cada TopicPartition. Os carimbos de data/hora precisam ser fornecidos como um valor longo do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC , por exemplo1686444353000 . Veja a nota abaixo sobre detalhes do comportamento com carimbos de data/hora.endingOffsetsByTimestamp tem precedência sobre endingOffsets .Valor padrão: Nenhum |
endingTimestamp Tipo: Tipo de consulta: String apenas loteUm valor de cadeia de caracteres do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC , por exemplo "1686444353000" . Se Kafka não retornar a compensação correspondente, a compensação será definida como a mais recente. Veja a nota abaixo sobre detalhes do comportamento com carimbos de data/hora. Nota: endingTimestamp tem precedência sobre endingOffsetsByTimestamp eendingOffsets .Valor padrão: Nenhum |
includeHeaders Tipo: Tipo de consulta: Boolean streaming e loteSe os cabeçalhos de Kafka devem ser incluídos na linha. Valor predefinido: false |
kafka.<consumer_option> Tipo: Tipo de consulta: String streaming e loteQualquer opção específica do consumidor Kafka pode ser passada com o prefixo kafka. . Essas opções precisam ser cercadas por backticks quando fornecidas, caso contrário, você receberá um erro de analisador. Você pode encontrar as opções na documentação de Kafka.Nota: Não deve definir as seguintes opções com esta função: key.deserializer , value.deserializer , bootstrap.servers , group.id Valor padrão: Nenhum |
maxOffsetsPerTrigger Tipo: Long Tipo de consulta: apenas streamingLimite de taxa para o número máximo de deslocamentos ou linhas processadas por intervalo de gatilho. O número total especificado de deslocamentos será dividido proporcionalmente em TopicPartitions. Valor padrão: Nenhum |
startingOffsets Tipo: Tipo de consulta: String streaming e loteO ponto inicial quando uma consulta é iniciada, que "earliest" é dos primeiros deslocamentos, "latest" que é apenas dos deslocamentos mais recentes, ou uma cadeia de caracteres JSON especificando um deslocamento inicial para cada TopicPartition. No JSON, -2 como um offset pode ser usado para se referir ao mais antigo, -1 ao mais recente.Nota: Para consultas em lote, o mais recente (implicitamente ou usando -1 em JSON) não é permitido. Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta começarão no mínimo. Valor padrão: "latest" para streaming, "earliest" para lote |
startingOffsetsByTimestamp Tipo: Tipo de consulta: String streaming e loteUma cadeia de caracteres JSON especificando um carimbo de data/hora inicial para cada TopicPartition. Os carimbos de data/hora precisam ser fornecidos como um valor longo do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC , por exemplo 1686444353000 , . Veja a nota abaixo sobre detalhes do comportamento com carimbos de data/hora. Se Kafka não retornar o deslocamento correspondente, o comportamento seguirá para o valor da opção startingOffsetsByTimestampStrategy .startingOffsetsByTimestamp tem precedência sobre startingOffsets .Nota: Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta começarão no mínimo. Valor padrão: Nenhum |
startingOffsetsByTimestampStrategy Tipo: Tipo de consulta: String streaming e loteEssa estratégia é usada quando o deslocamento inicial especificado por carimbo de data/hora (global ou por partição) não corresponde ao deslocamento que Kafka retornou. As estratégias disponíveis são: - "error" : falhar na consulta- "latest" : atribui o deslocamento mais recente para essas partições para que o Spark possa ler registros mais recentes dessas partições em microlotes posteriores.Valor predefinido: "error" |
startingTimestamp Tipo: Tipo de consulta: String streaming e loteUm valor de cadeia de caracteres do carimbo de data/hora em milissegundos desde 1970-01-01 00:00:00 UTC , por exemplo "1686444353000" . Veja a nota abaixo sobre detalhes do comportamento com carimbos de data/hora. Se Kafka não retornar o deslocamento correspondente, o comportamento seguirá para o valor da opção startingOffsetsByTimestampStrategy .startingTimestamp tem precedência sobre startingOffsetsByTimestamp e startingOffsets .Nota: Para consultas de streaming, isso só se aplica quando uma nova consulta é iniciada. As consultas de streaming reiniciadas continuarão a partir dos deslocamentos definidos no ponto de verificação da consulta. As partições recém-descobertas durante uma consulta serão iniciadas mais cedo. Valor padrão: Nenhum |
Nota
O deslocamento retornado para cada partição é o deslocamento mais antigo cujo carimbo de data/hora é maior ou igual ao carimbo de data/hora fornecido na partição correspondente. O comportamento varia entre as opções se Kafka não retornar o deslocamento correspondente - verifique a descrição de cada opção.
O Spark simplesmente passa as informações do carimbo de data/hora para KafkaConsumer.offsetsForTimes
o , e não interpreta ou raciocina sobre o valor. Para obter mais detalhes sobre KafkaConsumer.offsetsForTimes
o , consulte a documentação. Além disso, o significado de carimbo de data/hora aqui pode variar de acordo com a configuração de Kafka (log.message.timestamp.type
). Para obter detalhes, consulte a documentação do Apache Kafka.