Przesyłanie strumieniowe z pulsu Apache
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
W środowisku Databricks Runtime 14.1 lub nowszym można użyć przesyłania strumieniowego ze strukturą do strumieniowego przesyłania danych z usługi Apache Pulsar w usłudze Azure Databricks.
Przesyłanie strumieniowe ze strukturą zapewnia semantyka przetwarzania dokładnie raz dla danych odczytywanych ze źródeł Pulsar.
Przykład składni
Poniżej przedstawiono podstawowy przykład użycia przesyłania strumieniowego ze strukturą do odczytu z pulsaru:
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
Aby określić tematy, należy zawsze podać element service.url
i jedną z następujących opcji:
topic
topics
topicsPattern
Aby uzyskać pełną listę opcji, zobacz Konfigurowanie opcji odczytu pulsar przesyłania strumieniowego.
Uwierzytelnianie w pulsie
Usługa Azure Databricks obsługuje uwierzytelnianie magazynu zaufania i magazynu kluczy w usłudze Pulsar. Usługa Databricks zaleca używanie wpisów tajnych podczas przechowywania szczegółów konfiguracji.
Podczas konfigurowania strumienia można ustawić następujące opcje:
pulsar.client.authPluginClassName
pulsar.client.authParams
pulsar.client.useKeyStoreTls
pulsar.client.tlsTrustStoreType
pulsar.client.tlsTrustStorePath
pulsar.client.tlsTrustStorePassword
Jeśli strumień używa PulsarAdmin
elementu , ustaw również następujące ustawienia:
pulsar.admin.authPluginClassName
pulsar.admin.authParams
W poniższym przykładzie przedstawiono konfigurowanie opcji uwierzytelniania:
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Schemat pulsu
Schemat rekordów odczytywanych z Pulsar zależy od tego, jak tematy mają zakodowane schematy.
- W przypadku tematów ze schematem Avro lub JSON nazwy pól i typy pól są zachowywane w wynikowej ramce danych platformy Spark.
- W przypadku tematów bez schematu lub prostego typu danych w Pulsar ładunek jest ładowany do
value
kolumny. - Jeśli czytelnik jest skonfigurowany do odczytywania wielu tematów z różnymi schematami, ustaw opcję
allowDifferentTopicSchemas
ładowania nieprzetworzonejvalue
zawartości do kolumny.
Rekordy pulsarne mają następujące pola metadanych:
Kolumna | Type |
---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Konfigurowanie opcji odczytu przesyłania strumieniowego Pulsar
Wszystkie opcje są konfigurowane w ramach odczytu przesyłania strumieniowego ze strukturą przy użyciu .option("<optionName>", "<optionValue>")
składni. Uwierzytelnianie można również skonfigurować przy użyciu opcji. Zobacz Uwierzytelnianie w pulsacie.
W poniższej tabeli opisano wymagane konfiguracje pulsaru. Należy określić tylko jedną z opcji topic
lub topics
topicsPattern
.
Opcja | Wartość domyślna | Opis |
---|---|---|
service.url |
Brak | Konfiguracja Pulsar serviceUrl dla usługi Pulsar. |
topic |
Brak | Ciąg nazwy tematu do użytku. |
topics |
Brak | Rozdzielona przecinkami lista tematów do użytku. |
topicsPattern |
Brak | Ciąg wyrażeń regularnych Języka Java, który ma być zgodny z tematami do korzystania. |
W poniższej tabeli opisano inne opcje obsługiwane przez pulsar:
Opcja | Wartość domyślna | Opis |
---|---|---|
predefinedSubscription |
Brak | Wstępnie zdefiniowana nazwa subskrypcji używana przez łącznik do śledzenia postępu aplikacji platformy Spark. |
subscriptionPrefix |
Brak | Prefiks używany przez łącznik do generowania losowej subskrypcji do śledzenia postępu aplikacji platformy Spark. |
pollTimeoutMs |
120000 | Limit czasu odczytywania komunikatów z Pulsar w milisekundach. |
waitingForNonExistedTopic |
false |
Czy łącznik powinien czekać na utworzenie żądanych tematów. |
failOnDataLoss |
true |
Określa, czy zapytanie nie powiodło się w przypadku utraty danych (na przykład tematy są usuwane lub komunikaty są usuwane z powodu zasad przechowywania). |
allowDifferentTopicSchemas |
false |
Jeśli wiele tematów z różnymi schematami jest odczytywanych, użyj tego parametru, aby wyłączyć automatyczne deserializacji wartości tematu opartego na schemacie. Tylko nieprzetworzone wartości są zwracane, gdy jest true to . |
startingOffsets |
latest |
Jeśli latest element , czytelnik odczytuje najnowsze rekordy po uruchomieniu. Jeśli earliest element , czytnik odczytuje od najwcześniejszego przesunięcia. Użytkownik może również określić ciąg JSON, który określa określone przesunięcie. |
maxBytesPerTrigger |
Brak | Miękki limit maksymalnej liczby bajtów, które chcemy przetworzyć na mikrobajt. Jeśli jest to określone, admin.url należy również określić. |
admin.url |
Brak | Konfiguracja Pulsar serviceHttpUrl . Wymagane tylko wtedy, gdy maxBytesPerTrigger jest określony. |
Można również określić dowolną konfigurację klienta pulsar, administratora i czytelnika przy użyciu następujących wzorców:
Wzorzec | Łącze do opcji konifiguracji |
---|---|
pulsar.client.* |
Konfiguracja klienta Pulsar |
pulsar.admin.* |
Konfiguracja administratora pulsu |
pulsar.reader.* |
Konfiguracja czytnika pulsar |
Konstruowanie przesunięcia początkowego JSON
Możesz ręcznie skonstruować identyfikator komunikatu, aby określić określone przesunięcie i przekazać go jako kod JSON do startingOffsets
opcji. W poniższym przykładzie kodu pokazano tę składnię:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()