Compartilhar via


Transmitir do Apache Pulsar

Importante

Esse recurso está em uma versão prévia.

No Databricks Runtime 14.1 e superior, você pode usar o Streaming Estruturado para transmitir dados do Apache Pulsar no Azure Databricks.

O Streaming Estruturado fornece semântica de processamento exatamente uma vez para os dados lidos de fontes pulsares.

Exemplo de sintaxe

Veja a seguir um exemplo básico de como usar o Structured Streaming para ler do Pulsar:

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

Você sempre deve fornecer um service.url e uma das seguintes opções para especificar os tópicos:

  • topic
  • topics
  • topicsPattern

Para obter uma lista completa de opções, consulte Configurar opções para leitura de streaming Pulsar.

Autenticar no Pulsar

O Azure Databricks dá suporte à autenticação de repositório de chaves e de armazenamento de chaves para o Pulsar. O Databricks recomenda o uso de segredos ao armazenar detalhes de configuração.

Você pode definir as seguintes opções durante a configuração do fluxo:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

Se o fluxo usar um PulsarAdmin, também defina o seguinte:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

O exemplo a seguir demonstra a configuração das opções de autenticação:

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Esquema do Pulsar

O esquema dos registros lidos do Pulsar depende de como os tópicos têm seus esquemas codificados.

  • Para os tópicos com esquema Avro ou JSON, os nomes de campo e os tipos de campo são preservados no DataFrame do Spark resultante.
  • Para os tópicos sem esquema ou com um tipo de dados simples no Pulsar, a carga útil é carregada em uma coluna value.
  • Se o leitor estiver configurado para ler vários tópicos com esquemas diferentes, defina allowDifferentTopicSchemas para carregar o conteúdo bruto em uma coluna value.

Os registros Pulsar têm os seguintes campos de metadados:

Coluna Tipo
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Configurar opções para leitura de streaming do Pulsar

Todas as opções são configuradas como parte de uma leitura de Streaming Estruturado usando a sintaxe .option("<optionName>", "<optionValue>"). Você também pode configurar a autenticação usando as opções. Consulte Autenticar no Pulsar.

A tabela a seguir descreve as configurações necessárias para o Pulsar. Você deve especificar apenas uma das opções topic, topics ou topicsPattern.

Opção Valor padrão Descrição
service.url nenhum A configuração do Pulsar serviceUrl no serviço do Pulsar.
topic nenhum Uma cadeia de caracteres de nome de tópico para o tópico a ser consumido.
topics nenhum Uma lista separada por vírgulas dos tópicos a serem consumidos.
topicsPattern nenhum Uma cadeia de caracteres de expressão regular do Java a ser correspondida em tópicos a serem consumidos.

A tabela a seguir descreve outras opções com suporte do Pulsar:

Opção Valor padrão Descrição
predefinedSubscription nenhum O nome de assinatura predefinido usado pelo conector para acompanhar o progresso do aplicativo Spark.
subscriptionPrefix nenhum Um prefixo usado pelo conector para gerar uma assinatura aleatória para acompanhar o progresso do aplicativo Spark.
pollTimeoutMs 120000 O tempo limite para ler mensagens do Pulsar em milissegundos.
waitingForNonExistedTopic false Se o conector deve aguardar até que os tópicos desejados sejam criados.
failOnDataLoss true Controla se uma consulta deve falhar quando os dados são perdidos (por exemplo, os tópicos são excluídos ou as mensagens são excluídas devido à política de retenção).
allowDifferentTopicSchemas false Se vários tópicos com esquemas diferentes forem lidos, use esse parâmetro para desativar a desserialização automática do valor do tópico baseado em esquema. Somente os valores brutos são retornados quando isso for true.
startingOffsets latest Se latest, o leitor lê os registros mais recentes depois que ele começar a ser executado. Se earliest, o leitor lerá do deslocamento mais antigo. O usuário também pode especificar uma cadeia de caracteres JSON que especifica um deslocamento específico.
maxBytesPerTrigger nenhum Um limite flexível do número máximo de bytes que queremos processar por microlote. Se isso for especificado, admin.url também precisa ser especificado.
admin.url nenhum A configuração serviceHttpUrl do Pulsar. Só é necessário quando maxBytesPerTrigger é especificado.

Você também pode especificar as configurações de cliente, administrador e leitor do Pulsar usando os seguintes padrões:

Padrão Vincular às opções de conifiguração
pulsar.client.* Configuração do cliente do Pulsar
pulsar.admin.* Configuração do administrador do Pulsar
pulsar.reader.* Configuração do leitor do Pulsar

Construa o deslocamentos iniciais do JSON

Você pode construir manualmente uma ID de mensagem para especificar um deslocamento específico e passá-lo como um JSON para a opção startingOffsets. O exemplo de código a seguir demonstra esse padrão:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()