Streamování z Apache Pulsearu
Důležité
Tato funkce je ve verzi Public Preview.
Ve službě Databricks Runtime 14.1 a novějších můžete pomocí strukturovaného streamování streamovat data z Apache Pulsear v Azure Databricks.
Strukturované streamování poskytuje sémantiku zpracování přesně jednou pro data načtená z pulsárních zdrojů.
Příklad syntaxe
Následuje základní příklad použití strukturovaného streamování ke čtení z Pulsaru:
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
K zadání témat musíte vždy zadat service.url
jednu z následujících možností:
topic
topics
topicsPattern
Úplný seznam možností najdete v tématu Konfigurace možností pro čtení streamování Pulsar.
Ověření ve službě Pulsear
Azure Databricks podporuje ověřování úložiště důvěryhodnosti a úložiště klíčů pro Pulsear. Databricks doporučuje při ukládání podrobností o konfiguraci používat tajné kódy.
Během konfigurace streamu můžete nastavit následující možnosti:
pulsar.client.authPluginClassName
pulsar.client.authParams
pulsar.client.useKeyStoreTls
pulsar.client.tlsTrustStoreType
pulsar.client.tlsTrustStorePath
pulsar.client.tlsTrustStorePassword
Pokud datový proud používá PulsarAdmin
, nastavte také následující:
pulsar.admin.authPluginClassName
pulsar.admin.authParams
Následující příklad ukazuje konfiguraci možností ověřování:
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Pulzární schéma
Schéma záznamů přečtených z Pulsaru závisí na tom, jak mají témata svá schémata kódovaná.
- U témat se schématem Avro nebo JSON se názvy polí a typy polí zachovají ve výsledném datovém rámci Sparku.
- V případě témat bez schématu nebo jednoduchého datového typu v Pulsearu se datová část načte do
value
sloupce. - Pokud je čtenář nakonfigurovaný tak, aby četl více témat s různými schématy, nastavte
allowDifferentTopicSchemas
načíst nezpracovaný obsah dovalue
sloupce.
Záznamy pulsar mají následující pole metadat:
Column | Typ |
---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Konfigurace možností pro čtení streamování Pulsear
Všechny možnosti jsou nakonfigurované jako součást syntaxe čtení .option("<optionName>", "<optionValue>")
strukturovaného streamování. Ověřování můžete nakonfigurovat také pomocí možností. Viz Ověření v Pulsaru.
Následující tabulka popisuje požadované konfigurace pro Pulsear. Je nutné zadat pouze jednu z možností topic
, topics
nebo topicsPattern
.
Možnost | Výchozí hodnota | Popis |
---|---|---|
service.url |
Žádná | Konfigurace Pulsar serviceUrl pro službu Pulsear. |
topic |
Žádná | Řetězec názvu tématu, který se má použít. |
topics |
Žádná | Seznam témat oddělených čárkami, která se mají používat. |
topicsPattern |
Žádná | Řetězec regulárního výrazu Java, který se má shodovat s tématy, která se mají používat. |
Následující tabulka popisuje další možnosti podporované pro Pulsear:
Možnost | Výchozí hodnota | Popis |
---|---|---|
predefinedSubscription |
Žádná | Předdefinovaný název předplatného používaný konektorem ke sledování průběhu aplikace Spark. |
subscriptionPrefix |
Žádná | Předpona používaná konektorem ke generování náhodného předplatného ke sledování průběhu aplikace Spark. |
pollTimeoutMs |
120000 | Časový limit pro čtení zpráv z Pulsaru v milisekundách. |
waitingForNonExistedTopic |
false |
Určuje, jestli má konektor čekat na vytvoření požadovaných témat. |
failOnDataLoss |
true |
Určuje, jestli se dotaz nezdaří, když dojde ke ztrátě dat (například témata se odstraní nebo zprávy se odstraní kvůli zásadám uchovávání informací). |
allowDifferentTopicSchemas |
false |
Pokud je přečteno více témat s různými schématy, pomocí tohoto parametru vypněte automatickou deserializaci hodnot tématu založených na schématu. Pouze nezpracované hodnoty jsou vráceny, pokud je true to . |
startingOffsets |
latest |
Pokud latest čtečka přečte nejnovější záznamy po spuštění. Pokud earliest čtečka čte od nejstaršího posunu. Uživatel může také zadat řetězec JSON, který určuje konkrétní posun. |
maxBytesPerTrigger |
Žádná | Měkký limit maximálního počtu bajtů, které chceme zpracovat na mikrobatch. Je-li zadána tato hodnota, admin.url je třeba zadat také. |
admin.url |
Žádná | Konfigurace Pulsar serviceHttpUrl . Je potřeba pouze v případě, že maxBytesPerTrigger je zadán. |
Pomocí následujících vzorů můžete také zadat libovolné konfigurace klienta Pulsear, správce a čtenáře:
Vzor | Odkaz na možnosti konfigurace |
---|---|
pulsar.client.* |
Konfigurace klienta Pulsar |
pulsar.admin.* |
Konfigurace správce Pulsar |
pulsar.reader.* |
Konfigurace pulsární čtečky |
Vytvoření počátečních posunů JSON
ID zprávy můžete vytvořit ručně, abyste zadali konkrétní posun a předali ho jako JSON možnosti startingOffsets
. Následující příklad kódu ukazuje tuto syntaxi:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()