read_pulsar
Função com valor de tabela de streaming
Aplica-se a: Databricks SQL
Databricks Runtime 14.1 e superior
Importante
Esta funcionalidade está em Pré-visualização Pública.
Retorna uma tabela com registros lidos do Pulsar.
Esta função com valor de tabela suporta apenas streaming e não consulta em lote.
Sintaxe
read_pulsar ( { option_key => option_value } [, ...] )
Argumentos
Esta função requer a invocação de parâmetro nomeado para as teclas de opção.
As opções serviceUrl
e topic
são obrigatórias.
As descrições dos argumentos são breves aqui. Consulte a documentação estruturada do Pulsar de streaming para obter descrições estendidas.
Opção | Type | Predefinido | Description |
---|---|---|---|
serviceUrl | STRING | Obrigatório | O URI do serviço Pulsar. |
topic | STRING | Obrigatório | O tópico para ler. |
predefinidoSubscrição | STRING | Nenhuma | O nome de assinatura predefinido usado pelo conector para acompanhar o progresso do aplicativo spark. |
subscriptionPrefix | STRING | Nenhuma | Um prefixo usado pelo conector para gerar uma assinatura aleatória para acompanhar o progresso do aplicativo spark. |
pollTimeoutMs | LONGO | 120000 | O tempo limite para ler mensagens do Pulsar em milissegundos. |
failOnDataLoss | BOOLEANO | verdadeiro | Controla se uma consulta deve ser falhada quando os dados são perdidos (por exemplo, tópicos são excluídos ou mensagens são excluídas devido à política de retenção). |
iniciandoOffsets | STRING | mais recente | O ponto inicial quando uma consulta é iniciada, a mais antiga, a mais recente ou uma cadeia de caracteres JSON que especifica um deslocamento específico. Se for o mais recente, o leitor lê os registos mais recentes depois de começar a ser executado. Se mais cedo, o leitor lê desde o primeiro offset. O usuário também pode especificar uma cadeia de caracteres JSON que especifica um deslocamento específico. |
startingTime | STRING | Nenhuma | Quando especificado, a fonte Pulsar lerá mensagens a partir da posição do startingTime especificado. |
Os seguintes argumentos são usados para autenticação do cliente pulsar:
Opção | Type | Predefinido | Description |
---|---|---|---|
pulsarClientAuthPluginClassName | STRING | Nenhuma | Nome do plug-in de autenticação. |
pulsarClientAuthParams | STRING | Nenhuma | Parâmetros para o plug-in de autenticação. |
pulsarClientUseKeyStoreTls | STRING | Nenhuma | Se deve usar KeyStore para autenticação tls. |
pulsarClientTlsTrustStoreType | STRING | Nenhuma | Tipo de arquivo TrustStore para autenticação tls. |
pulsarClientTlsTrustStorePath | STRING | Nenhuma | Caminho do arquivo TrustStore para autenticação tls. |
pulsarClientTlsTrustStorePassword | STRING | Nenhuma | Senha TrustStore para autenticação tls. |
Esses argumentos são usados para configuração e autenticação do controle de admissão de pulsar, a configuração de administração de pulsar só é necessária quando o controle de admissão está habilitado (quando maxBytesPerTrigger é definido)
Opção | Type | Predefinido | Description |
---|---|---|---|
maxBytesPerTrigger | BIGINT | Nenhuma | Um limite suave do número máximo de bytes que queremos processar por microlote. Se isso for especificado, admin.url também precisa ser especificado. |
adminUrl | STRING | Nenhuma | A configuração do Pulsar serviceHttpUrl. Só é necessário quando maxBytesPerTrigger é especificado. |
pulsarAdminAuthPlugin | STRING | Nenhuma | Nome do plug-in de autenticação. |
pulsarAdminAuthParams | STRING | Nenhuma | Parâmetros para o plug-in de autenticação. |
pulsarClientUseKeyStoreTls | STRING | Nenhuma | Se deve usar KeyStore para autenticação tls. |
pulsarAdminTlsTrustStoreType | STRING | Nenhuma | Tipo de arquivo TrustStore para autenticação tls. |
pulsarAdminTlsTrustStorePath | STRING | Nenhuma | Caminho do arquivo TrustStore para autenticação tls. |
pulsarAdminTlsTrustStorePassword | STRING | Nenhuma | Senha TrustStore para autenticação tls. |
Devoluções
Uma tabela de registros de pulsar com o esquema a seguir.
__key STRING NOT NULL
: Pulsar chave de mensagem.value BINARY NOT NULL
: Valor da mensagem pulsar.Nota: Para tópicos com esquema Avro ou JSON, em vez de carregar conteúdo em um campo de valor binário, o conteúdo será expandido para preservar os nomes de campo e os tipos de campo do tópico Pulsar.
__topic STRING NOT NULL
: Nome do tópico pulsar.__messageId BINARY NOT NULL
: ID da mensagem pulsar.__publishTime TIMESTAMP NOT NULL
: Tempo de publicação da mensagem pulsar.__eventTime TIMESTAMP NOT NULL
: Hora do evento de mensagem pulsar.__messageProperties MAP<STRING, STRING>
: Propriedades da mensagem pulsar.
Exemplos
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.