Strömma från Apache Pulsar
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
I Databricks Runtime 14.1 och senare kan du använda Structured Streaming för att strömma data från Apache Pulsar på Azure Databricks.
Strukturerad direktuppspelning ger exakt en gång bearbetningssemantik för data som lästs från Pulsar-källor.
Syntaxexempel
Följande är ett grundläggande exempel på hur du använder Structured Streaming för att läsa från Pulsar:
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
Du måste alltid ange ett service.url
och något av följande alternativ för att ange ämnen:
topic
topics
topicsPattern
En fullständig lista över alternativ finns i Konfigurera alternativ för Pulsar-strömning.
Autentisera till Pulsar
Azure Databricks stöder autentisering med förtroendearkiv och nyckelarkiv till Pulsar. Databricks rekommenderar att du använder hemligheter när du lagrar konfigurationsinformation.
Du kan ange följande alternativ under strömkonfigurationen:
pulsar.client.authPluginClassName
pulsar.client.authParams
pulsar.client.useKeyStoreTls
pulsar.client.tlsTrustStoreType
pulsar.client.tlsTrustStorePath
pulsar.client.tlsTrustStorePassword
Om strömmen använder en PulsarAdmin
anger du även följande:
pulsar.admin.authPluginClassName
pulsar.admin.authParams
I följande exempel visas hur du konfigurerar autentiseringsalternativ:
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Pulsar-schema
Schemat för poster som läss från Pulsar beror på hur ämnen har sina scheman kodade.
- För ämnen med Avro- eller JSON-schema bevaras fältnamn och fälttyper i den resulterande Spark DataFrame.
- För ämnen utan schema eller med en enkel datatyp i Pulsar läses nyttolasten in i en
value
kolumn. - Om läsaren är konfigurerad för att läsa flera ämnen med olika scheman anger du
allowDifferentTopicSchemas
för att läsa in råinnehållet i envalue
kolumn.
Pulsar-poster har följande metadatafält:
Column | Typ |
---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Konfigurera alternativ för läsläsning av Pulsar-strömning
Alla alternativ konfigureras som en del av en strukturerad strömningsläsning med hjälp av .option("<optionName>", "<optionValue>")
syntax. Du kan också konfigurera autentisering med hjälp av alternativ. Se Autentisera till Pulsar.
I följande tabell beskrivs nödvändiga konfigurationer för Pulsar. Du måste bara ange ett av alternativen topic
, topics
eller topicsPattern
.
Alternativ | Standardvärde | Description |
---|---|---|
service.url |
inget | Pulsar-konfigurationen serviceUrl för Pulsar-tjänsten. |
topic |
inget | En ämnesnamnsträng som ämnet ska använda. |
topics |
inget | En kommaavgränsad lista över de ämnen som ska användas. |
topicsPattern |
inget | En Java regex-sträng som matchar ämnen som ska användas. |
I följande tabell beskrivs andra alternativ som stöds för Pulsar:
Alternativ | Standardvärde | Description |
---|---|---|
predefinedSubscription |
inget | Det fördefinierade prenumerationsnamnet som används av anslutningsappen för att spåra spark-programmets förlopp. |
subscriptionPrefix |
inget | Ett prefix som används av anslutningsappen för att generera en slumpmässig prenumeration för att spåra spark-programförloppet. |
pollTimeoutMs |
120000 | Tidsgränsen för att läsa meddelanden från Pulsar i millisekunder. |
waitingForNonExistedTopic |
false |
Om anslutningsappen ska vänta tills de önskade ämnena har skapats. |
failOnDataLoss |
true |
Styr om en fråga ska misslyckas när data går förlorade (till exempel ämnen tas bort eller meddelanden tas bort på grund av kvarhållningsprincip). |
allowDifferentTopicSchemas |
false |
Om du läser flera ämnen med olika scheman kan du använda den här parametern för att inaktivera automatisk schemabaserad deserialisering av ämnesvärden. Endast råvärdena returneras när detta är true . |
startingOffsets |
latest |
Om latest läser läsaren de senaste posterna när den börjar köras. Om earliest läser läsaren från den tidigaste förskjutningen. Användaren kan också ange en JSON-sträng som anger en specifik förskjutning. |
maxBytesPerTrigger |
inget | En mjuk gräns för det maximala antalet byte som vi vill bearbeta per mikrobatch. Om detta anges admin.url måste du också anges. |
admin.url |
inget | Pulsar-konfigurationen serviceHttpUrl . Behövs endast när maxBytesPerTrigger anges. |
Du kan också ange pulsar-klient-, administratörs- och läsarkonfigurationer med hjälp av följande mönster:
Mönster | Länka till konfigurationsalternativ |
---|---|
pulsar.client.* |
Pulsar-klientkonfiguration |
pulsar.admin.* |
Pulsar-administratörskonfiguration |
pulsar.reader.* |
Pulsar-läsarkonfiguration |
Konstruktionsstarten förskjuter JSON
Du kan manuellt konstruera ett meddelande-ID för att ange en specifik förskjutning och skicka detta som en JSON till alternativet startingOffsets
. Följande kodexempel visar den här syntaxen:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()