Freigeben über


Streamen von Apache Pulsar

Wichtig

Dieses Feature befindet sich in der Public Preview.

In Databricks Runtime 14.1 und höher können Sie strukturiertes Streaming verwenden, um Daten von Apache Pulsar in Azure Databricks zu streamen.

Strukturiertes Streaming bietet eine Semantik für die einmalige Verarbeitung von Daten, die aus Pulsar-Quellen gelesen werden.

Syntaxbeispiel

Im Folgenden finden Sie ein grundlegendes Beispiel für die Verwendung von strukturiertem Streaming zum Lesen von Pulsar:

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

Zur Angabe von Themen müssen Sie immer eine service.url und eine der folgenden Optionen angeben:

  • topic
  • topics
  • topicsPattern

Eine vollständige Liste der Optionen finden Sie unter Konfigurieren von Optionen für Pulsar-Streaming-Lesevorgänge.

Authentifizieren bei Pulsar

Azure Databricks unterstützt die Truststore- und Keystore-Authentifizierung bei Pulsar. Databricks empfiehlt die Verwendung geheimer Schlüssel zum Speichern von Konfigurationsdetails.

Sie können die folgenden Optionen während der Streamkonfiguration festlegen:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

Wenn der Stream eine PulsarAdmin verwendet, legen Sie auch Folgendes fest:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

Das folgende Beispiel veranschaulicht die Konfiguration der Authentifizierungsoptionen:

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Pulsar-Schema

Das Schema der aus Pulsar gelesenen Datensätze hängt davon ab, wie die Schemata in den Themen codiert wurden.

  • Bei Themen mit Avro- oder JSON-Schema bleiben Feldnamen und Feldtypen im resultierenden Spark DataFrame erhalten.
  • Bei Themen ohne Schema oder mit einem einfachen Datentyp in Pulsar werden die Nutzdaten in eine value-Spalte geladen.
  • Wenn der Leser für das Lesen mehrerer Themen mit unterschiedlichen Schemata konfiguriert ist, legen Sie allowDifferentTopicSchemas fest, um den unformatierten Inhalt in eine value-Spalte zu laden.

Pulsar-Datensätze verfügen über die folgenden Metadatenfelder:

Spalte Typ
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Konfigurieren von Optionen für Pulsar-Streaming-Lesevorgänge

Alle Optionen werden als Teil eines strukturierten Streaming-Lesevorgangs mit .option("<optionName>", "<optionValue>")-Syntax konfiguriert. Sie können die Authentifizierung auch mit Optionen konfigurieren. Weitere Informationen finden Sie unter Authentifizieren bei Pulsar.

In der folgenden Tabelle werden die erforderlichen Konfigurationen für Pulsar beschrieben. Sie dürfen nur eine der Optionen topic, topics oder topicsPattern angeben.

Option Standardwert BESCHREIBUNG
service.url Keine Die Pulsar serviceUrl-Konfiguration für den Pulsar-Dienst.
topic Keine Eine Zeichenfolge für den Namen des zu verwendenden Themas.
topics Keine Eine durch Kommas getrennte Liste der zu verwendenden Themen.
topicsPattern Keine Eine RegEx-Zeichenfolge für Java zum Suchen nach Themen, die verwendet werden sollen.

In der folgenden Tabelle werden weitere Optionen beschrieben, die für Pulsar unterstützt werden:

Option Standardwert BESCHREIBUNG
predefinedSubscription Keine Der vordefinierte Abonnementname, der vom Connector verwendet wird, um den Spark-Anwendungsstatus nachzuverfolgen.
subscriptionPrefix Keine Ein Präfix, das vom Connector verwendet wird, um ein zufälliges Abonnement zu generieren, um den Spark-Anwendungsstatus nachzuverfolgen.
pollTimeoutMs 120000 Das Timeout zum Lesen von Nachrichten von Pulsar in Millisekunden.
waitingForNonExistedTopic false Gibt an, ob der Connector warten soll, bis die gewünschten Themen erstellt worden sind.
failOnDataLoss true Steuert, ob eine Abfrage fehlschlägt, wenn Daten verloren gehen (z. B. wenn Themen gelöscht werden oder Nachrichten aufgrund einer Aufbewahrungsrichtlinie gelöscht werden).
allowDifferentTopicSchemas false Wenn mehrere Themen mit unterschiedlichen Schemata gelesen werden, verwenden Sie diesen Parameter, um die automatische schemabasierte Deserialisierung von Themenwerten zu deaktivieren. Wenn dies true ist, werden nur die Rohwerte zurückgegeben.
startingOffsets latest Wenn latest, liest der Leser die neuesten Datensätze, nachdem er gestartet wurde. Wenn earliest, liest der Leser ab dem frühesten Offset. Der Benutzer kann auch eine JSON-Zeichenfolge angeben, die einen bestimmten Offset festlegt.
maxBytesPerTrigger Keine Eine weiche Grenze der maximalen Anzahl von Bytes, die pro Mikrobatch verarbeitet werden sollen. Wenn dies angegeben wird, muss auch admin.url angegeben werden.
admin.url Keine Die Pulsar serviceHttpUrl-Konfiguration. Nur erforderlich, wenn maxBytesPerTrigger angegeben wird.

Sie können mit den folgenden Mustern auch beliebige Pulsar-Client-, Administrator- und Lesekonfigurationen angeben:

Muster Link zu den Konfigurationsoptionen
pulsar.client.* Pulsar-Clientkonfiguration
pulsar.admin.* Pulsar-Administratorkonfiguration
pulsar.reader.* Pulsar-Leserkonfiguration

Konstrukt von Startoffsets in JSON

Sie können manuell eine Nachrichten-ID erstellen, um einen bestimmten Offset anzugeben und diese als JSON an die Option startingOffsets zu übergeben. Im folgenden Codebeispiel wird diese Syntax veranschaulicht:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()