Apache Pulsar에서 스트림
Important
이 기능은 공개 미리 보기 상태입니다.
Databricks Runtime 14.1 이상에서는 구조적 스트리밍을 사용하여 Azure Databricks의 Apache Pulsar에서 데이터를 스트리밍할 수 있습니다.
구조적 스트리밍은 Pulsar 원본에서 읽은 데이터에 대해 정확히 한 번 처리 의미 체계를 제공합니다.
구문 예시
다음은 구조적 스트리밍을 사용하여 Pulsar에서 읽는 기본 예제입니다.
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
항목을 지정하려면 항상 다음 옵션 중 하나를 제공해야 service.url
합니다.
topic
topics
topicsPattern
전체 옵션 목록은 펄서 스트리밍 읽기에 대한 옵션 구성을 참조 하세요.
Pulsar에 인증
Azure Databricks는 Pulsar에 대한 truststore 및 키 저장소 인증을 지원합니다. Databricks는 구성 세부 정보를 저장할 때 비밀을 사용하는 것이 좋습니다.
스트림 구성 중에 다음 옵션을 설정할 수 있습니다.
pulsar.client.authPluginClassName
pulsar.client.authParams
pulsar.client.useKeyStoreTls
pulsar.client.tlsTrustStoreType
pulsar.client.tlsTrustStorePath
pulsar.client.tlsTrustStorePassword
스트림에서 다음을 PulsarAdmin
사용하는 경우 다음도 설정합니다.
pulsar.admin.authPluginClassName
pulsar.admin.authParams
다음 예제에서는 인증 옵션을 구성하는 방법을 보여 줍니다.
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
펄서 스키마
Pulsar에서 읽은 레코드의 스키마는 토픽의 스키마를 인코딩하는 방법에 따라 달라집니다.
- Avro 또는 JSON 스키마가 있는 토픽의 경우 결과 Spark DataFrame에서 필드 이름 및 필드 형식이 유지됩니다.
- 스키마가 없거나 Pulsar의 단순 데이터 형식이 있는 토픽의 경우 페이로드가 열에
value
로드됩니다. - 판독기에서 스키마가 다른 여러 항목을 읽도록 구성된 경우 원시 콘텐츠를 열에 로드하도록
value
설정합니다allowDifferentTopicSchemas
.
펄서 레코드에는 다음과 같은 메타데이터 필드가 있습니다.
Column | Type |
---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
펄서 스트리밍 읽기에 대한 옵션 구성
모든 옵션은 구문을 사용하여 .option("<optionName>", "<optionValue>")
구조적 스트리밍 읽기의 일부로 구성됩니다. 옵션을 사용하여 인증을 구성할 수도 있습니다. 펄서 인증을 참조하세요.
다음 표에서는 Pulsar에 필요한 구성에 대해 설명합니다. 옵션 topic
topics
topicsPattern
중 하나만 지정해야 합니다.
옵션 | 기본값 | 설명 |
---|---|---|
service.url |
없음 | Pulsar serviceUrl 서비스에 대한 펄서 구성입니다. |
topic |
없음 | 사용할 토픽의 토픽 이름 문자열입니다. |
topics |
없음 | 사용할 항목의 쉼표로 구분된 목록입니다. |
topicsPattern |
없음 | 사용할 토픽과 일치하는 Java regex 문자열입니다. |
다음 표에서는 Pulsar에 지원되는 다른 옵션에 대해 설명합니다.
옵션 | 기본값 | 설명 |
---|---|---|
predefinedSubscription |
없음 | Spark 애플리케이션 진행률을 추적하기 위해 커넥터에서 사용하는 미리 정의된 구독 이름입니다. |
subscriptionPrefix |
없음 | Spark 애플리케이션 진행률을 추적하기 위해 임의 구독을 생성하기 위해 커넥터에서 사용하는 접두사입니다. |
pollTimeoutMs |
120000 | Pulsar에서 메시지를 읽기 위한 시간 제한(밀리초)입니다. |
waitingForNonExistedTopic |
false |
커넥터가 원하는 토픽이 생성될 때까지 기다려야 하는지 여부입니다. |
failOnDataLoss |
true |
데이터가 손실될 때 쿼리에 실패할지 여부를 제어합니다(예: 토픽이 삭제되거나 보존 정책으로 인해 메시지가 삭제됨). |
allowDifferentTopicSchemas |
false |
스키마가 다른 여러 항목을 읽는 경우 이 매개 변수를 사용하여 자동 스키마 기반 토픽 값 역직렬화를 해제합니다. 이 경우 원시 값만 반환됩니다 true . |
startingOffsets |
latest |
이 경우 latest 판독기는 실행을 시작한 후 최신 레코드를 읽습니다. 이 경우 earliest 판독기는 가장 빠른 오프셋에서 읽습니다. 사용자는 특정 오프셋을 지정하는 JSON 문자열을 지정할 수도 있습니다. |
maxBytesPerTrigger |
없음 | 마이크로배치당 처리하려는 최대 바이트 수의 소프트 제한입니다. 이 항목이 지정되면 admin.url 지정해야 합니다. |
admin.url |
없음 | 펄서 serviceHttpUrl 구성입니다. 지정된 경우에만 maxBytesPerTrigger 필요합니다. |
다음 패턴을 사용하여 Pulsar 클라이언트, 관리자 및 판독기 구성을 지정할 수도 있습니다.
패턴 | 구성 옵션에 연결 |
---|---|
pulsar.client.* |
펄서 클라이언트 구성 |
pulsar.admin.* |
Pulsar 관리자 구성 |
pulsar.reader.* |
펄서 판독기 구성 |
시작 오프셋 JSON 생성
메시지 ID를 수동으로 생성하여 특정 오프셋을 지정하고 이를 JSON으로 옵션에 startingOffsets
전달할 수 있습니다. 다음 코드 예제에서는 이 구문을 보여 줍니다.
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()