Partilhar via


read_pulsar Função com valor de tabela de streaming

Aplica-se a:Marque Sim Databricks SQL Marque Sim Databricks Runtime 14.1 e superior

Importante

Esta funcionalidade está em Pré-visualização Pública.

Retorna uma tabela com registros lidos do Pulsar.

Esta função com valor de tabela suporta apenas streaming e não consulta em lote.

Sintaxe

read_pulsar ( { option_key => option_value } [, ...] )

Argumentos

Esta função requer a invocação de parâmetro nomeado para as teclas de opção.

As opções serviceUrl e topic são obrigatórias.

As descrições dos argumentos são breves aqui. Consulte a documentação estruturada do Pulsar de streaming para obter descrições estendidas.

Opção Type Predefinido Description
serviceUrl STRING Obrigatório O URI do serviço Pulsar.
topic STRING Obrigatório O tópico para ler.
predefinidoSubscrição STRING Nenhuma O nome de assinatura predefinido usado pelo conector para acompanhar o progresso do aplicativo spark.
subscriptionPrefix STRING Nenhuma Um prefixo usado pelo conector para gerar uma assinatura aleatória para acompanhar o progresso do aplicativo spark.
pollTimeoutMs LONGO 120000 O tempo limite para ler mensagens do Pulsar em milissegundos.
failOnDataLoss BOOLEANO verdadeiro Controla se uma consulta deve ser falhada quando os dados são perdidos (por exemplo, tópicos são excluídos ou mensagens são excluídas devido à política de retenção).
iniciandoOffsets STRING mais recente O ponto inicial quando uma consulta é iniciada, a mais antiga, a mais recente ou uma cadeia de caracteres JSON que especifica um deslocamento específico. Se for o mais recente, o leitor lê os registos mais recentes depois de começar a ser executado. Se mais cedo, o leitor lê desde o primeiro offset. O usuário também pode especificar uma cadeia de caracteres JSON que especifica um deslocamento específico.
startingTime STRING Nenhuma Quando especificado, a fonte Pulsar lerá mensagens a partir da posição do startingTime especificado.

Os seguintes argumentos são usados para autenticação do cliente pulsar:

Opção Type Predefinido Description
pulsarClientAuthPluginClassName STRING Nenhuma Nome do plug-in de autenticação.
pulsarClientAuthParams STRING Nenhuma Parâmetros para o plug-in de autenticação.
pulsarClientUseKeyStoreTls STRING Nenhuma Se deve usar KeyStore para autenticação tls.
pulsarClientTlsTrustStoreType STRING Nenhuma Tipo de arquivo TrustStore para autenticação tls.
pulsarClientTlsTrustStorePath STRING Nenhuma Caminho do arquivo TrustStore para autenticação tls.
pulsarClientTlsTrustStorePassword STRING Nenhuma Senha TrustStore para autenticação tls.

Esses argumentos são usados para configuração e autenticação do controle de admissão de pulsar, a configuração de administração de pulsar só é necessária quando o controle de admissão está habilitado (quando maxBytesPerTrigger é definido)

Opção Type Predefinido Description
maxBytesPerTrigger BIGINT Nenhuma Um limite suave do número máximo de bytes que queremos processar por microlote. Se isso for especificado, admin.url também precisa ser especificado.
adminUrl STRING Nenhuma A configuração do Pulsar serviceHttpUrl. Só é necessário quando maxBytesPerTrigger é especificado.
pulsarAdminAuthPlugin STRING Nenhuma Nome do plug-in de autenticação.
pulsarAdminAuthParams STRING Nenhuma Parâmetros para o plug-in de autenticação.
pulsarClientUseKeyStoreTls STRING Nenhuma Se deve usar KeyStore para autenticação tls.
pulsarAdminTlsTrustStoreType STRING Nenhuma Tipo de arquivo TrustStore para autenticação tls.
pulsarAdminTlsTrustStorePath STRING Nenhuma Caminho do arquivo TrustStore para autenticação tls.
pulsarAdminTlsTrustStorePassword STRING Nenhuma Senha TrustStore para autenticação tls.

Devoluções

Uma tabela de registros de pulsar com o esquema a seguir.

  • __key STRING NOT NULL: Pulsar chave de mensagem.

  • value BINARY NOT NULL: Valor da mensagem pulsar.

    Nota: Para tópicos com esquema Avro ou JSON, em vez de carregar conteúdo em um campo de valor binário, o conteúdo será expandido para preservar os nomes de campo e os tipos de campo do tópico Pulsar.

  • __topic STRING NOT NULL: Nome do tópico pulsar.

  • __messageId BINARY NOT NULL: ID da mensagem pulsar.

  • __publishTime TIMESTAMP NOT NULL: Tempo de publicação da mensagem pulsar.

  • __eventTime TIMESTAMP NOT NULL: Hora do evento de mensagem pulsar.

  • __messageProperties MAP<STRING, STRING>: Propriedades da mensagem pulsar.

Exemplos

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.