订阅 Google Pub/Sub
Azure Databricks 提供了一个内置连接器,用于订阅 Databricks Runtime 13.3 LTS 及更高版本中的 Google 发布/订阅。 此连接器为订阅服务器中的记录提供一次性的处理语义。
注意
Pub/Sub 可能会发布重复的记录,记录到达订阅者手中的顺序也可能会打乱。 应编写 Azure Databricks 代码来处理重复记录和无序记录。
语法示例
下面的代码示例演示了用于配置从 Pub/Sub 读取的结构化流的基本语法:
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
有关更多配置选项,请参阅“配置 Pub/Sub 流式处理读取”选项。
配置对 Pub/Sub 的访问权限
Databricks 建议在提供授权选项时使用机密。 授权连接需要以下选项:
clientEmail
clientId
privateKey
privateKeyId
下表描述了配置凭据所需的角色:
角色 | 必需或可选 | 使用方式 |
---|---|---|
roles/pubsub.viewer 或 roles/viewer |
必须 | 检查订阅是否存在并获取订阅 |
roles/pubsub.subscriber |
必须 | 从订阅中提取数据 |
roles/pubsub.editor 或 roles/editor |
可选 | 如果订阅不存在,则启用创建订阅,同时允许在流终止时使用 deleteSubscriptionOnStreamStop 删除订阅 |
Pub/Sub 架构
流的架构与从 Pub/Sub 提取的记录匹配,如下表所述:
字段 | 类型 |
---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
配置 Pub/Sub 流式传输读取的选项
下表描述了 Pub/Sub 支持的其他选项。 所有选项都在使用 .option("<optionName>", "<optionValue>")
语法进行结构化流式传输读取的过程中配置。
注意
某些 Pub/Sub 配置选项使用提取概念,而不是微批处理。 这反映了内部实现详细信息,并且选项与其他结构化流连接器中的辅助模型类似,但提取和处理记录除外。
选项 | 默认值 | 说明 |
---|---|---|
numFetchPartitions |
设置为流初始化时存在的执行程序数量的一半。 | 从订阅中提取记录的并行 Spark 任务数。 |
deleteSubscriptionOnStreamStop |
false |
如果为 true ,流式处理作业结束时将删除传递给流的订阅。 |
maxBytesPerTrigger |
无 | 每个触发的微批处理期间要处理的批大小的软限制。 |
maxRecordsPerFetch |
1000 | 在处理记录之前,要提取每个任务的记录数。 |
maxFetchPeriod |
10 秒 | 处理记录之前要提取的每个任务的持续时间。 Databricks 建议使用默认值。 |
Pub/Sub 的增量批处理语义
可以使用 Trigger.AvailableNow
通过增量批处理从 Pub/Sub 源使用可用记录。
Azure Databricks 在 Trigger.AvailableNow
设置中记录你开始读取的时间戳。 批次处理的记录包括之前获取的所有数据,以及时间戳小于记录流开始时间戳的任何新发布记录。
请参阅配置增量批处理。
监视流式处理指标
结构化流式处理进度指标报告提取的记录数和准备处理的记录数、提取的记录大小以及自流开始以来看到的重复项数。 以下是此类指标的示例:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
限制
Pub/Sub 不支持推理执行 (spark.speculation
)。