Streamen von Apache Pulsar
Wichtig
Dieses Feature befindet sich in der Public Preview.
In Databricks Runtime 14.1 und höher können Sie strukturiertes Streaming verwenden, um Daten von Apache Pulsar in Azure Databricks zu streamen.
Strukturiertes Streaming bietet eine Semantik für die einmalige Verarbeitung von Daten, die aus Pulsar-Quellen gelesen werden.
Syntaxbeispiel
Im Folgenden finden Sie ein grundlegendes Beispiel für die Verwendung von strukturiertem Streaming zum Lesen von Pulsar:
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
Zur Angabe von Themen müssen Sie immer eine service.url
und eine der folgenden Optionen angeben:
topic
topics
topicsPattern
Eine vollständige Liste der Optionen finden Sie unter Konfigurieren von Optionen für Pulsar-Streaming-Lesevorgänge.
Authentifizieren bei Pulsar
Azure Databricks unterstützt die Truststore- und Keystore-Authentifizierung bei Pulsar. Databricks empfiehlt die Verwendung geheimer Schlüssel zum Speichern von Konfigurationsdetails.
Sie können die folgenden Optionen während der Streamkonfiguration festlegen:
pulsar.client.authPluginClassName
pulsar.client.authParams
pulsar.client.useKeyStoreTls
pulsar.client.tlsTrustStoreType
pulsar.client.tlsTrustStorePath
pulsar.client.tlsTrustStorePassword
Wenn der Stream eine PulsarAdmin
verwendet, legen Sie auch Folgendes fest:
pulsar.admin.authPluginClassName
pulsar.admin.authParams
Das folgende Beispiel veranschaulicht die Konfiguration der Authentifizierungsoptionen:
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Pulsar-Schema
Das Schema der aus Pulsar gelesenen Datensätze hängt davon ab, wie die Schemata in den Themen codiert wurden.
- Bei Themen mit Avro- oder JSON-Schema bleiben Feldnamen und Feldtypen im resultierenden Spark DataFrame erhalten.
- Bei Themen ohne Schema oder mit einem einfachen Datentyp in Pulsar werden die Nutzdaten in eine
value
-Spalte geladen. - Wenn der Leser für das Lesen mehrerer Themen mit unterschiedlichen Schemata konfiguriert ist, legen Sie
allowDifferentTopicSchemas
fest, um den unformatierten Inhalt in einevalue
-Spalte zu laden.
Pulsar-Datensätze verfügen über die folgenden Metadatenfelder:
Spalte | Typ |
---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Konfigurieren von Optionen für Pulsar-Streaming-Lesevorgänge
Alle Optionen werden als Teil eines strukturierten Streaming-Lesevorgangs mit .option("<optionName>", "<optionValue>")
-Syntax konfiguriert. Sie können die Authentifizierung auch mit Optionen konfigurieren. Weitere Informationen finden Sie unter Authentifizieren bei Pulsar.
In der folgenden Tabelle werden die erforderlichen Konfigurationen für Pulsar beschrieben. Sie dürfen nur eine der Optionen topic
, topics
oder topicsPattern
angeben.
Option | Standardwert | BESCHREIBUNG |
---|---|---|
service.url |
Keine | Die Pulsar serviceUrl -Konfiguration für den Pulsar-Dienst. |
topic |
Keine | Eine Zeichenfolge für den Namen des zu verwendenden Themas. |
topics |
Keine | Eine durch Kommas getrennte Liste der zu verwendenden Themen. |
topicsPattern |
Keine | Eine RegEx-Zeichenfolge für Java zum Suchen nach Themen, die verwendet werden sollen. |
In der folgenden Tabelle werden weitere Optionen beschrieben, die für Pulsar unterstützt werden:
Option | Standardwert | BESCHREIBUNG |
---|---|---|
predefinedSubscription |
Keine | Der vordefinierte Abonnementname, der vom Connector verwendet wird, um den Spark-Anwendungsstatus nachzuverfolgen. |
subscriptionPrefix |
Keine | Ein Präfix, das vom Connector verwendet wird, um ein zufälliges Abonnement zu generieren, um den Spark-Anwendungsstatus nachzuverfolgen. |
pollTimeoutMs |
120000 | Das Timeout zum Lesen von Nachrichten von Pulsar in Millisekunden. |
waitingForNonExistedTopic |
false |
Gibt an, ob der Connector warten soll, bis die gewünschten Themen erstellt worden sind. |
failOnDataLoss |
true |
Steuert, ob eine Abfrage fehlschlägt, wenn Daten verloren gehen (z. B. wenn Themen gelöscht werden oder Nachrichten aufgrund einer Aufbewahrungsrichtlinie gelöscht werden). |
allowDifferentTopicSchemas |
false |
Wenn mehrere Themen mit unterschiedlichen Schemata gelesen werden, verwenden Sie diesen Parameter, um die automatische schemabasierte Deserialisierung von Themenwerten zu deaktivieren. Wenn dies true ist, werden nur die Rohwerte zurückgegeben. |
startingOffsets |
latest |
Wenn latest , liest der Leser die neuesten Datensätze, nachdem er gestartet wurde. Wenn earliest , liest der Leser ab dem frühesten Offset. Der Benutzer kann auch eine JSON-Zeichenfolge angeben, die einen bestimmten Offset festlegt. |
maxBytesPerTrigger |
Keine | Eine weiche Grenze der maximalen Anzahl von Bytes, die pro Mikrobatch verarbeitet werden sollen. Wenn dies angegeben wird, muss auch admin.url angegeben werden. |
admin.url |
Keine | Die Pulsar serviceHttpUrl -Konfiguration. Nur erforderlich, wenn maxBytesPerTrigger angegeben wird. |
Sie können mit den folgenden Mustern auch beliebige Pulsar-Client-, Administrator- und Lesekonfigurationen angeben:
Muster | Link zu den Konfigurationsoptionen |
---|---|
pulsar.client.* |
Pulsar-Clientkonfiguration |
pulsar.admin.* |
Pulsar-Administratorkonfiguration |
pulsar.reader.* |
Pulsar-Leserkonfiguration |
Konstrukt von Startoffsets in JSON
Sie können manuell eine Nachrichten-ID erstellen, um einen bestimmten Offset anzugeben und diese als JSON an die Option startingOffsets
zu übergeben. Im folgenden Codebeispiel wird diese Syntax veranschaulicht:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()