read_pubsub
流式表值函数
适用于: Databricks SQL Databricks Runtime 13.3 LTS 及更高版本
返回表,其中包含从主题的 Pub/Sub 中读取的记录。 仅支持流式处理查询。
语法
read_pubsub( { parameter => value } [, ...])
参数
read_pubsub
需要命名参数调用。
唯一需要的参数是 subscriptionId
、projectId
和 topicId
。 所有其他参数都是可选的。
有关完整的参数描述,请参阅配置 Pub/Sub 流式处理读取的选项。
Databricks 建议在提供授权选项时使用机密。 请参阅 secret 函数。
有关配置 Pub/Sub 访问权限的详细信息,请参阅配置 Pub/Sub。
参数 | 类型 | 描述 |
---|---|---|
subscriptionId |
STRING |
必需,分配给 Pub/Sub 订阅的唯一标识符。 |
projectId |
STRING |
必填项,与 Pub/Sub 主题相关联的 Google Cloud 项目 ID。 |
topicId |
STRING |
必填项,要订阅的 Pub/Sub 主题的 ID 或名称。 |
clientEmail |
STRING |
与用于身份验证的服务帐户关联的电子邮件地址。 |
clientId |
STRING |
与用于身份验证的服务帐户关联的客户端 ID。 |
privateKeyId |
STRING |
与服务帐户关联的私钥 ID。 |
privateKey |
STRING |
与用于身份验证的服务帐户关联的私钥。 |
当阅读 Pub/Sub 时,这些参数用于进一步微调:
参数 | 类型 | 描述 |
---|---|---|
numFetchPartitions |
STRING |
可选,具有默认执行程序数。 从订阅中提取记录的并行 Spark 任务数。 |
deleteSubscriptionOnStreamStop |
BOOLEAN |
可选,默认为 false 。 如果设置为 true,流式处理作业结束时将删除传递给流的订阅。 |
maxBytesPerTrigger |
STRING |
每个触发的微批处理期间要处理的批大小的软限制。 默认值为“无”。 |
maxRecordsPerFetch |
STRING |
在处理记录之前,要提取每个任务的记录数。 默认值为“1000”。 |
maxFetchPeriod |
STRING |
处理记录之前要提取的每个任务的持续时间。 默认值为“10s”。 |
返回
具有以下架构的 Pub/Sub 记录表。 属性列可以为 null,但所有其他列都不为 null。
名称 | 数据类型 | Nullable | 标准 | 描述 |
---|---|---|---|---|
messageId |
STRING |
否 | Pub/Sub 消息的唯一标识符。 | |
payload |
BINARY |
否 | Pub/Sub 消息的内容。 | |
attributes |
STRING |
是 | 表示 Pub/Sub 消息属性的键值对。 这是 json 编码的字符串。 | |
publishTimestampInMillis |
BIGINT |
否 | 发布消息时的时间戳,以毫秒为单位。 | |
sequenceNumber |
BIGINT |
否 | 记录分片中该记录的唯一标识符。 |
示例
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
现在需要从 testing.streaming_table
查询数据以进行进一步分析。
错误查询:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);