read_pubsub 流式表值函数

适用于:勾选“是” Databricks SQL 勾选“是” Databricks Runtime 13.3 LTS 及更高版本

返回表,其中包含从主题的 Pub/Sub 中读取的记录。 仅支持流式处理查询。

语法

read_pubsub( { parameter => value } [, ...])

参数

read_pubsub 需要命名参数调用

唯一需要的参数是 subscriptionIdprojectIdtopicId。 所有其他参数都是可选的。

有关完整的参数描述,请参阅配置 Pub/Sub 流式处理读取的选项

Databricks 建议在提供授权选项时使用机密。 请参阅 secret 函数

有关配置 Pub/Sub 访问权限的详细信息,请参阅配置 Pub/Sub

参数 类型 描述
subscriptionId STRING 必需,分配给 Pub/Sub 订阅的唯一标识符。
projectId STRING 必填项,与 Pub/Sub 主题相关联的 Google Cloud 项目 ID。
topicId STRING 必填项,要订阅的 Pub/Sub 主题的 ID 或名称。
clientEmail STRING 与用于身份验证的服务帐户关联的电子邮件地址。
clientId STRING 与用于身份验证的服务帐户关联的客户端 ID。
privateKeyId STRING 与服务帐户关联的私钥 ID。
privateKey STRING 与用于身份验证的服务帐户关联的私钥。

当阅读 Pub/Sub 时,这些参数用于进一步微调:

参数 类型 描述
numFetchPartitions STRING 可选,具有默认执行程序数。 从订阅中提取记录的并行 Spark 任务数。
deleteSubscriptionOnStreamStop BOOLEAN 可选,默认为 false。 如果设置为 true,流式处理作业结束时将删除传递给流的订阅。
maxBytesPerTrigger STRING 每个触发的微批处理期间要处理的批大小的软限制。 默认值为“无”。
maxRecordsPerFetch STRING 在处理记录之前,要提取每个任务的记录数。 默认值为“1000”。
maxFetchPeriod STRING 处理记录之前要提取的每个任务的持续时间。 默认值为“10s”。

返回

具有以下架构的 Pub/Sub 记录表。 属性列可以为 null,但所有其他列都不为 null。

名称 数据类型 Nullable 标准 描述
messageId STRING Pub/Sub 消息的唯一标识符。
payload BINARY Pub/Sub 消息的内容。
attributes STRING 表示 Pub/Sub 消息属性的键值对。 这是 json 编码的字符串。
publishTimestampInMillis BIGINT 发布消息时的时间戳,以毫秒为单位。
sequenceNumber BIGINT 记录分片中该记录的唯一标识符。

示例

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

现在需要从 testing.streaming_table 查询数据以进行进一步分析。

错误查询:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);