Flusso da Apache Pulsar
Importante
Questa funzionalità è disponibile in anteprima pubblica.
In Databricks Runtime 14.1 e versioni successive è possibile usare Structured Streaming per trasmettere dati da Apache Pulsar in Azure Databricks.
Structured Streaming offre una semantica di elaborazione esattamente una volta per i dati letti dalle origini Pulsar.
Esempio di sintassi
Di seguito è riportato un esempio di base dell'uso di Structured Streaming per leggere da Pulsar:
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
Per specificare gli argomenti, è necessario specificare sempre un service.url
e una delle opzioni seguenti:
topic
topics
topicsPattern
Per un elenco completo delle opzioni, vedere Configurare le opzioni per la lettura in streaming Pulsar.
Eseguire l'autenticazione a Pulsar
Azure Databricks supporta l'autenticazione dell'archivio attendibilità e dell'archivio chiavi in Pulsar. Databricks consiglia di usare i segreti durante l'archiviazione dei dettagli di configurazione.
È possibile impostare le opzioni seguenti durante la configurazione del flusso:
pulsar.client.authPluginClassName
pulsar.client.authParams
pulsar.client.useKeyStoreTls
pulsar.client.tlsTrustStoreType
pulsar.client.tlsTrustStorePath
pulsar.client.tlsTrustStorePassword
Se il flusso usa , PulsarAdmin
impostare anche quanto segue:
pulsar.admin.authPluginClassName
pulsar.admin.authParams
L'esempio seguente illustra la configurazione delle opzioni di autenticazione:
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Schema Pulsar
Lo schema dei record letti da Pulsar dipende dal modo in cui gli argomenti hanno i relativi schemi codificati.
- Per gli argomenti con lo schema Avro o JSON, i nomi dei campi e i tipi di campo vengono mantenuti nel dataframe Spark risultante.
- Per gli argomenti senza schema o con un tipo di dati semplice in Pulsar, il payload viene caricato in una
value
colonna. - Se il lettore è configurato per leggere più argomenti con schemi diversi, impostare
allowDifferentTopicSchemas
per caricare il contenuto non elaborato in unavalue
colonna.
I record Pulsar hanno i campi di metadati seguenti:
Column | Type |
---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Configurare le opzioni per la lettura in streaming Pulsar
Tutte le opzioni vengono configurate come parte di una lettura structured streaming usando .option("<optionName>", "<optionValue>")
la sintassi. È anche possibile configurare l'autenticazione usando le opzioni. Vedere Eseguire l'autenticazione a Pulsar.
Nella tabella seguente vengono descritte le configurazioni necessarie per Pulsar. È necessario specificare solo una delle opzioni topic
o topics
topicsPattern
.
Opzione | Default value | Descrizione |
---|---|---|
service.url |
Nessuno | Configurazione pulsar per il servizio Pulsar serviceUrl . |
topic |
Nessuno | Stringa del nome dell'argomento da utilizzare. |
topics |
Nessuno | Elenco delimitato da virgole degli argomenti da utilizzare. |
topicsPattern |
Nessuno | Stringa regex Java da trovare in base agli argomenti da utilizzare. |
La tabella seguente descrive altre opzioni supportate per Pulsar:
Opzione | Default value | Descrizione |
---|---|---|
predefinedSubscription |
Nessuno | Nome di sottoscrizione predefinito usato dal connettore per tenere traccia dello stato dell'applicazione Spark. |
subscriptionPrefix |
Nessuno | Prefisso usato dal connettore per generare una sottoscrizione casuale per tenere traccia dello stato dell'applicazione Spark. |
pollTimeoutMs |
120000 | Timeout per la lettura dei messaggi da Pulsar in millisecondi. |
waitingForNonExistedTopic |
false |
Indica se il connettore deve attendere fino a quando non vengono creati gli argomenti desiderati. |
failOnDataLoss |
true |
Controlla se non eseguire una query quando i dati vengono persi( ad esempio, gli argomenti vengono eliminati o i messaggi vengono eliminati a causa dei criteri di conservazione). |
allowDifferentTopicSchemas |
false |
Se vengono letti più argomenti con schemi diversi, usare questo parametro per disattivare la deserializzazione automatica del valore dell'argomento basato su schema. Quando si tratta true di , vengono restituiti solo i valori non elaborati. |
startingOffsets |
latest |
Se latest , il lettore legge i record più recenti dopo l'avvio dell'esecuzione. Se earliest , il lettore legge dall'offset meno recente. L'utente può anche specificare una stringa JSON che specifica un offset specifico. |
maxBytesPerTrigger |
Nessuno | Limite flessibile del numero massimo di byte da elaborare per microbatch. Se questa opzione è specificata, admin.url è necessario specificare anche . |
admin.url |
Nessuno | Configurazione di Pulsar serviceHttpUrl . È necessario solo quando maxBytesPerTrigger viene specificato . |
È anche possibile specificare qualsiasi configurazione client, amministratore e lettore Pulsar usando i modelli seguenti:
Modello | Collegamento alle opzioni di conifigurazione |
---|---|
pulsar.client.* |
Configurazione del client Pulsar |
pulsar.admin.* |
Configurazione dell'amministratore pulsar |
pulsar.reader.* |
Configurazione del lettore Pulsar |
Costruire offset iniziali JSON
È possibile costruire manualmente un ID messaggio per specificare un offset specifico e passarlo come JSON all'opzione startingOffsets
. L'esempio di codice seguente illustra questa sintassi:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()