Compartilhar via


read_pulsar função de valor de tabela streaming

Aplica-se a: verificação marcada como sim SQL do Databricks marca de seleção positiva Databricks Runtime 14.1 e superior

Importante

Esse recurso está em uma versão prévia.

Retorna uma tabela com registros lidos do Pulsar.

Essa função de valor de tabela só dá suporte a consultas de streaming e não em lote.

Sintaxe

read_pulsar ( { option_key => option_value } [, ...] )

Argumentos

Essa função requer invocação de parâmetro nomeada para as chaves de opção.

As opções serviceUrl e topic são obrigatórias.

As descrições dos argumentos são breves aqui. Consulte a documentação do Pulsar de fluxo estruturado para obter descrições detalhadas.

Opção Type Padrão Descrição
serviceUrl STRING Obrigatório O URI do serviço Pulsar.
tópico STRING Obrigatório O tópico a ser lido.
predefinedSubscription STRING Nenhum O nome de assinatura predefinido usado pelo conector para acompanhar o progresso do aplicativo Spark.
subscriptionPrefix STRING Nenhum Um prefixo usado pelo conector para gerar uma assinatura aleatória para acompanhar o progresso do aplicativo Spark.
pollTimeoutMs LONG 120000 O tempo limite para ler mensagens do Pulsar em milissegundos.
failOnDataLoss BOOLEAN true Controla se uma consulta deve falhar quando os dados são perdidos (por exemplo, os tópicos são excluídos ou as mensagens são excluídas devido à política de retenção).
startingOffsets STRING mais recente O ponto inicial quando uma consulta é iniciada, seja o mais antigo, o mais recente ou uma cadeia de caracteres JSON que especifica um deslocamento específico. Se for mais recente, o leitor lerá os registros mais recentes depois de iniciar a execução. Se for o mais antigo, o leitor lerá a partir do deslocamento mais antigo. O usuário também pode especificar uma cadeia de caracteres JSON que especifica um deslocamento específico.
startingTime STRING Nenhum Quando especificado, a origem Pulsar fará a leitura das mensagens a partir da posição do startingTime especificado.

Os argumentos a seguir são usados para autenticação do cliente do Pulsar:

Opção Type Padrão Descrição
pulsarClientAuthPluginClassName STRING Nenhum Nome do plug-in de autenticação.
pulsarClientAuthParams STRING Nenhum Parâmetros para o plug-in de autenticação.
pulsarClientUseKeyStoreTls STRING Nenhum Se deve usar o KeyStore para autenticação tls.
pulsarClientTlsTrustStoreType STRING Nenhum Tipo de arquivo TrustStore para autenticação tls.
pulsarClientTlsTrustStorePath STRING Nenhum Caminho do arquivo TrustStore para autenticação tls.
pulsarClientTlsTrustStorePassword STRING Nenhum Senha do TrustStore para autenticação tls.

Esses argumentos são usados para configuração e autenticação do controle de admissão do pulsar. A configuração do administrador do pulsar só é necessária quando o controle de admissão está habilitado (quando maxBytesPerTrigger está definido)

Opção Type Padrão Descrição
maxBytesPerTrigger bigint Nenhum Um limite flexível do número máximo de bytes que queremos processar por microlote. Se isso for especificado, a admin.url também precisará ser especificada.
adminUrl STRING Nenhum A configuração serviceHttpUrl do Pulsar. Necessário somente quando o maxBytesPerTrigger é especificado.
pulsarAdminAuthPlugin STRING Nenhum Nome do plug-in de autenticação.
pulsarAdminAuthParams STRING Nenhum Parâmetros para o plug-in de autenticação.
pulsarClientUseKeyStoreTls STRING Nenhum Se deve usar o KeyStore para autenticação tls.
pulsarAdminTlsTrustStoreType STRING Nenhum Tipo de arquivo TrustStore para autenticação tls.
pulsarAdminTlsTrustStorePath STRING Nenhum Caminho do arquivo TrustStore para autenticação tls.
pulsarAdminTlsTrustStorePassword STRING Nenhum Senha do TrustStore para autenticação tls.

Retornos

Uma tabela de registros de pulsar com o seguinte esquema.

  • __key STRING NOT NULL: Chave de mensagem do pulsar.

  • value BINARY NOT NULL: Valor da mensagem do pulsar.

    Observação: para tópicos com esquema Avro ou JSON, em vez de carregar o conteúdo em um campo de valor binário, o conteúdo será expandido para preservar os nomes e os tipos de campo do tópico do Pulsar.

  • __topic STRING NOT NULL: Nome do tópico do pulsar.

  • __messageId BINARY NOT NULL: ID da mensagem do pulsar.

  • __publishTime TIMESTAMP NOT NULL: Hora de publicação da mensagem do pulsar.

  • __eventTime TIMESTAMP NOT NULL: Hora do evento da mensagem do pulsar.

  • __messageProperties MAP<STRING, STRING>: Propriedades da mensagem do pulsar.

Exemplos

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.