read_pulsar
função de valor de tabela streaming
Aplica-se a: SQL do Databricks Databricks Runtime 14.1 e superior
Importante
Esse recurso está em uma versão prévia.
Retorna uma tabela com registros lidos do Pulsar.
Essa função de valor de tabela só dá suporte a consultas de streaming e não em lote.
Sintaxe
read_pulsar ( { option_key => option_value } [, ...] )
Argumentos
Essa função requer invocação de parâmetro nomeada para as chaves de opção.
As opções serviceUrl
e topic
são obrigatórias.
As descrições dos argumentos são breves aqui. Consulte a documentação do Pulsar de fluxo estruturado para obter descrições detalhadas.
Opção | Type | Padrão | Descrição |
---|---|---|---|
serviceUrl | STRING | Obrigatório | O URI do serviço Pulsar. |
tópico | STRING | Obrigatório | O tópico a ser lido. |
predefinedSubscription | STRING | Nenhum | O nome de assinatura predefinido usado pelo conector para acompanhar o progresso do aplicativo Spark. |
subscriptionPrefix | STRING | Nenhum | Um prefixo usado pelo conector para gerar uma assinatura aleatória para acompanhar o progresso do aplicativo Spark. |
pollTimeoutMs | LONG | 120000 | O tempo limite para ler mensagens do Pulsar em milissegundos. |
failOnDataLoss | BOOLEAN | true | Controla se uma consulta deve falhar quando os dados são perdidos (por exemplo, os tópicos são excluídos ou as mensagens são excluídas devido à política de retenção). |
startingOffsets | STRING | mais recente | O ponto inicial quando uma consulta é iniciada, seja o mais antigo, o mais recente ou uma cadeia de caracteres JSON que especifica um deslocamento específico. Se for mais recente, o leitor lerá os registros mais recentes depois de iniciar a execução. Se for o mais antigo, o leitor lerá a partir do deslocamento mais antigo. O usuário também pode especificar uma cadeia de caracteres JSON que especifica um deslocamento específico. |
startingTime | STRING | Nenhum | Quando especificado, a origem Pulsar fará a leitura das mensagens a partir da posição do startingTime especificado. |
Os argumentos a seguir são usados para autenticação do cliente do Pulsar:
Opção | Type | Padrão | Descrição |
---|---|---|---|
pulsarClientAuthPluginClassName | STRING | Nenhum | Nome do plug-in de autenticação. |
pulsarClientAuthParams | STRING | Nenhum | Parâmetros para o plug-in de autenticação. |
pulsarClientUseKeyStoreTls | STRING | Nenhum | Se deve usar o KeyStore para autenticação tls. |
pulsarClientTlsTrustStoreType | STRING | Nenhum | Tipo de arquivo TrustStore para autenticação tls. |
pulsarClientTlsTrustStorePath | STRING | Nenhum | Caminho do arquivo TrustStore para autenticação tls. |
pulsarClientTlsTrustStorePassword | STRING | Nenhum | Senha do TrustStore para autenticação tls. |
Esses argumentos são usados para configuração e autenticação do controle de admissão do pulsar. A configuração do administrador do pulsar só é necessária quando o controle de admissão está habilitado (quando maxBytesPerTrigger está definido)
Opção | Type | Padrão | Descrição |
---|---|---|---|
maxBytesPerTrigger | bigint | Nenhum | Um limite flexível do número máximo de bytes que queremos processar por microlote. Se isso for especificado, a admin.url também precisará ser especificada. |
adminUrl | STRING | Nenhum | A configuração serviceHttpUrl do Pulsar. Necessário somente quando o maxBytesPerTrigger é especificado. |
pulsarAdminAuthPlugin | STRING | Nenhum | Nome do plug-in de autenticação. |
pulsarAdminAuthParams | STRING | Nenhum | Parâmetros para o plug-in de autenticação. |
pulsarClientUseKeyStoreTls | STRING | Nenhum | Se deve usar o KeyStore para autenticação tls. |
pulsarAdminTlsTrustStoreType | STRING | Nenhum | Tipo de arquivo TrustStore para autenticação tls. |
pulsarAdminTlsTrustStorePath | STRING | Nenhum | Caminho do arquivo TrustStore para autenticação tls. |
pulsarAdminTlsTrustStorePassword | STRING | Nenhum | Senha do TrustStore para autenticação tls. |
Retornos
Uma tabela de registros de pulsar com o seguinte esquema.
__key STRING NOT NULL
: Chave de mensagem do pulsar.value BINARY NOT NULL
: Valor da mensagem do pulsar.Observação: para tópicos com esquema Avro ou JSON, em vez de carregar o conteúdo em um campo de valor binário, o conteúdo será expandido para preservar os nomes e os tipos de campo do tópico do Pulsar.
__topic STRING NOT NULL
: Nome do tópico do pulsar.__messageId BINARY NOT NULL
: ID da mensagem do pulsar.__publishTime TIMESTAMP NOT NULL
: Hora de publicação da mensagem do pulsar.__eventTime TIMESTAMP NOT NULL
: Hora do evento da mensagem do pulsar.__messageProperties MAP<STRING, STRING>
: Propriedades da mensagem do pulsar.
Exemplos
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.