Sdílet prostřednictvím


Streamování z Apache Pulsearu

Důležité

Tato funkce je ve verzi Public Preview.

Ve službě Databricks Runtime 14.1 a novějších můžete pomocí strukturovaného streamování streamovat data z Apache Pulsear v Azure Databricks.

Strukturované streamování poskytuje sémantiku zpracování přesně jednou pro data načtená z pulsárních zdrojů.

Příklad syntaxe

Následuje základní příklad použití strukturovaného streamování ke čtení z Pulsaru:

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

K zadání témat musíte vždy zadat service.url jednu z následujících možností:

  • topic
  • topics
  • topicsPattern

Úplný seznam možností najdete v tématu Konfigurace možností pro čtení streamování Pulsar.

Ověření ve službě Pulsear

Azure Databricks podporuje ověřování úložiště důvěryhodnosti a úložiště klíčů pro Pulsear. Databricks doporučuje při ukládání podrobností o konfiguraci používat tajné kódy.

Během konfigurace streamu můžete nastavit následující možnosti:

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

Pokud datový proud používá PulsarAdmin, nastavte také následující:

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

Následující příklad ukazuje konfiguraci možností ověřování:

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Pulzární schéma

Schéma záznamů přečtených z Pulsaru závisí na tom, jak mají témata svá schémata kódovaná.

  • U témat se schématem Avro nebo JSON se názvy polí a typy polí zachovají ve výsledném datovém rámci Sparku.
  • V případě témat bez schématu nebo jednoduchého datového typu v Pulsearu se datová část načte do value sloupce.
  • Pokud je čtenář nakonfigurovaný tak, aby četl více témat s různými schématy, nastavte allowDifferentTopicSchemas načíst nezpracovaný obsah do value sloupce.

Záznamy pulsar mají následující pole metadat:

Column Typ
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Konfigurace možností pro čtení streamování Pulsear

Všechny možnosti jsou nakonfigurované jako součást syntaxe čtení .option("<optionName>", "<optionValue>") strukturovaného streamování. Ověřování můžete nakonfigurovat také pomocí možností. Viz Ověření v Pulsaru.

Následující tabulka popisuje požadované konfigurace pro Pulsear. Je nutné zadat pouze jednu z možností topic, topics nebo topicsPattern.

Možnost Výchozí hodnota Popis
service.url Žádná Konfigurace Pulsar serviceUrl pro službu Pulsear.
topic Žádná Řetězec názvu tématu, který se má použít.
topics Žádná Seznam témat oddělených čárkami, která se mají používat.
topicsPattern Žádná Řetězec regulárního výrazu Java, který se má shodovat s tématy, která se mají používat.

Následující tabulka popisuje další možnosti podporované pro Pulsear:

Možnost Výchozí hodnota Popis
predefinedSubscription Žádná Předdefinovaný název předplatného používaný konektorem ke sledování průběhu aplikace Spark.
subscriptionPrefix Žádná Předpona používaná konektorem ke generování náhodného předplatného ke sledování průběhu aplikace Spark.
pollTimeoutMs 120000 Časový limit pro čtení zpráv z Pulsaru v milisekundách.
waitingForNonExistedTopic false Určuje, jestli má konektor čekat na vytvoření požadovaných témat.
failOnDataLoss true Určuje, jestli se dotaz nezdaří, když dojde ke ztrátě dat (například témata se odstraní nebo zprávy se odstraní kvůli zásadám uchovávání informací).
allowDifferentTopicSchemas false Pokud je přečteno více témat s různými schématy, pomocí tohoto parametru vypněte automatickou deserializaci hodnot tématu založených na schématu. Pouze nezpracované hodnoty jsou vráceny, pokud je trueto .
startingOffsets latest Pokud latestčtečka přečte nejnovější záznamy po spuštění. Pokud earliestčtečka čte od nejstaršího posunu. Uživatel může také zadat řetězec JSON, který určuje konkrétní posun.
maxBytesPerTrigger Žádná Měkký limit maximálního počtu bajtů, které chceme zpracovat na mikrobatch. Je-li zadána tato hodnota, admin.url je třeba zadat také.
admin.url Žádná Konfigurace Pulsar serviceHttpUrl . Je potřeba pouze v případě, že maxBytesPerTrigger je zadán.

Pomocí následujících vzorů můžete také zadat libovolné konfigurace klienta Pulsear, správce a čtenáře:

Vzor Odkaz na možnosti konfigurace
pulsar.client.* Konfigurace klienta Pulsar
pulsar.admin.* Konfigurace správce Pulsar
pulsar.reader.* Konfigurace pulsární čtečky

Vytvoření počátečních posunů JSON

ID zprávy můžete vytvořit ručně, abyste zadali konkrétní posun a předali ho jako JSON možnosti startingOffsets . Následující příklad kódu ukazuje tuto syntaxi:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()