Stream vanuit Apache Pulsar
Belangrijk
Deze functie is beschikbaar als openbare preview.
In Databricks Runtime 14.1 en hoger kunt u Structured Streaming gebruiken om gegevens van Apache Pulsar op Azure Databricks te streamen.
Structured Streaming biedt exact eenmaal verwerkingssemantiek voor gegevens die uit Pulsar-bronnen worden gelezen.
Voorbeeld van syntaxis
Hier volgt een eenvoudig voorbeeld van het gebruik van Structured Streaming om te lezen uit Pulsar:
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
U moet altijd een service.url
en een van de volgende opties opgeven om onderwerpen op te geven:
topic
topics
topicsPattern
Zie Opties configureren voor Pulsar-streamingvoor een volledige lijst met opties.
Verifiëren bij Pulsar
Azure Databricks biedt ondersteuning voor truststore- en sleutelopslagverificatie voor Pulsar. Databricks raadt aan geheimen te gebruiken bij het opslaan van configuratiegegevens.
U kunt de volgende opties instellen tijdens de streamconfiguratie:
pulsar.client.authPluginClassName
pulsar.client.authParams
pulsar.client.useKeyStoreTls
pulsar.client.tlsTrustStoreType
pulsar.client.tlsTrustStorePath
pulsar.client.tlsTrustStorePassword
Als de stream gebruikmaakt van een PulsarAdmin
, stelt u ook het volgende in:
pulsar.admin.authPluginClassName
pulsar.admin.authParams
In het volgende voorbeeld ziet u hoe u verificatieopties configureert:
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Pulsar-schema
Het schema van records dat uit Pulsar wordt gelezen, is afhankelijk van de wijze waarop de schema's van de onderwerpen zijn gecodeerd.
- Voor onderwerpen met avro- of JSON-schema blijven veldnamen en veldtypen behouden in het resulterende Spark DataFrame.
- Voor topics zonder schema of met een eenvoudig gegevenstype in Pulsar wordt de payload geladen in een
value
kolom. - Als de lezer is geconfigureerd voor het lezen van meerdere onderwerpen met verschillende schema's, stelt u
allowDifferentTopicSchemas
in om de onbewerkte inhoud te laden in eenvalue
kolom.
Pulsar-records hebben de volgende metagegevensvelden:
Kolom | Type |
---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Opties configureren voor het lezen van Pulsar-streaming
Alle opties worden geconfigureerd als onderdeel van een Structured Streaming-leesbewerking met behulp van .option("<optionName>", "<optionValue>")
syntaxis. U kunt verificatie ook configureren met behulp van opties. Zie Verifiëren bij Pulsar.
In de volgende tabel worden de vereiste configuraties voor Pulsar beschreven. U moet slechts één van de opties topic
opgeven, topics
of topicsPattern
.
Optie | Default value | Beschrijving |
---|---|---|
service.url |
Geen | De Pulsar-configuratie voor de Pulsar-service serviceUrl . |
topic |
Geen | Een tekenreeks voor de onderwerpnaam die het onderwerp moet gebruiken. |
topics |
Geen | Een door komma's gescheiden lijst met onderwerpen om te consumeren. |
topicsPattern |
Geen | Een Java regex-tekenreeks die overeenkomt met onderwerpen die moeten worden gebruikt. |
In de volgende tabel worden andere opties beschreven die worden ondersteund voor Pulsar:
Optie | Default value | Beschrijving |
---|---|---|
predefinedSubscription |
Geen | De vooraf gedefinieerde abonnementsnaam die door de connector wordt gebruikt om de voortgang van spark-toepassingen bij te houden. |
subscriptionPrefix |
Geen | Een voorvoegsel dat door de connector wordt gebruikt om een willekeurig abonnement te genereren om de voortgang van spark-toepassingen bij te houden. |
pollTimeoutMs |
120.000 | De time-out voor het lezen van berichten van Pulsar in milliseconden. |
waitingForNonExistedTopic |
false |
Of de connector moet wachten totdat de gewenste onderwerpen zijn gemaakt. |
failOnDataLoss |
true |
Hiermee bepaalt u of een query mislukt wanneer gegevens verloren gaan (bijvoorbeeld onderwerpen worden verwijderd of berichten worden verwijderd vanwege bewaarbeleid). |
allowDifferentTopicSchemas |
false |
Als meerdere onderwerpen met verschillende schema's worden gelezen, gebruikt u deze parameter om automatische deserialisatie van onderwerpwaarden op basis van schema's uit te schakelen. Alleen de onbewerkte waarden worden geretourneerd wanneer dit wordt true . |
startingOffsets |
latest |
Als latest de lezer de nieuwste records leest nadat deze is gestart. Als earliest , leest de lezer vanaf het vroegst mogelijke offset. De gebruiker kan ook een JSON-tekenreeks opgeven die een specifieke offset aangeeft. |
maxBytesPerTrigger |
Geen | Een zachte limiet van het maximum aantal bytes dat we per microbatch willen verwerken. Als dit is opgegeven, admin.url moet ook worden opgegeven. |
admin.url |
Geen | De Pulsar-configuratie serviceHttpUrl . Alleen nodig wanneer maxBytesPerTrigger is opgegeven. |
U kunt ook configuraties voor Pulsar-clients, beheerders en lezers opgeven met behulp van de volgende patronen:
Patroon | Koppeling naar opties voor conifiguratie |
---|---|
pulsar.client.* |
Pulsar-clientconfiguratie |
pulsar.admin.* |
Configuratie van Pulsar-beheerder |
pulsar.reader.* |
Configuratie van Pulsar-lezer |
JSON maken met begin offsets
U kunt handmatig een bericht-id maken om een specifieke offset op te geven en deze als JSON door te geven aan de optie startingOffsets
. In het volgende codevoorbeeld ziet u deze syntaxis:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()