read_pubsub
串流 table-valued 函式
適用於: Databricks SQL
Databricks Runtime 13.3 LTS 和更新版本
傳回一個 table,其中包含從 Pub/Sub 中某主題讀取的記錄。 僅支援串流查詢。
語法
read_pubsub( { parameter => value } [, ...])
引數
read_pubsub
需要具名參數調用。
唯一必要的自變數是 subscriptionId
、 projectId
和 topicId
。 所有其他自變數都是選擇性的。
如需完整的自變數描述,請參閱 設定發佈/子串流讀取的選項。
Databricks 建議在提供授權選項時使用秘密。 請參閱 秘密函式。
如需設定 Pub/Sub 存取權的詳細資訊,請參閱 設定 Pub/Sub 的存取權。
參數 | 類型 | 描述 |
---|---|---|
subscriptionId |
STRING |
必要,指派給 Pub/Sub 訂閱的唯一識別碼 identifier。 |
projectId |
STRING |
必要專案標識碼,與 Pub/Sub 主題相關聯的 Google Cloud 項目識別碼。 |
topicId |
STRING |
必要,要訂閱的 Pub/Sub 主題識別碼或名稱。 |
clientEmail |
STRING |
與服務帳戶相關聯的電子郵件地址進行驗證。 |
clientId |
STRING |
與服務帳戶相關聯的用戶端標識碼以進行驗證。 |
privateKeyId |
STRING |
與服務帳戶相關聯的私鑰標識碼。 |
privateKey |
STRING |
與服務帳戶相關聯的私鑰進行驗證。 |
從 Pub/Sub 讀取時,這些自變數會用於進一步微調:
參數 | 類型 | 描述 |
---|---|---|
numFetchPartitions |
STRING |
選擇性,具有預設的執行程序數目。 從訂用帳戶擷取記錄的平行 Spark 工作數目。 |
deleteSubscriptionOnStreamStop |
BOOLEAN |
選擇性,預設 false 為 。 如果 set 為 true,當串流作業結束時,會刪除傳遞至串流的訂閱。 |
maxBytesPerTrigger |
STRING |
每當觸發微批次時,需處理的批次大小設為軟性 limit。 預設值為 『none』。 |
maxRecordsPerFetch |
STRING |
處理記錄之前要擷取每個工作的記錄數目。 預設值為 『1000』。 |
maxFetchPeriod |
STRING |
處理記錄之前,每個工作要擷取的時間持續時間。 預設值為 『10s』。 |
傳回
具有下列 schema的 Pub/Sub 記錄 table。 column 的屬性可以是空值,但所有其他 columns 的屬性都不能是空值。
名稱 | 資料類型 | Nullable | 標準 | 描述 |
---|---|---|---|---|
messageId |
STRING |
No | Pub/Sub 訊息的「唯一 identifier」。 | |
payload |
BINARY |
No | Pub/Sub 訊息的內容。 | |
attributes |
STRING |
Yes | 代表 Pub/Sub 訊息屬性的索引鍵/值組。 這是 json 編碼的字串。 | |
publishTimestampInMillis |
BIGINT |
No | 訊息發佈時的時間戳,以毫秒為單位。 | |
sequenceNumber |
BIGINT |
No | 在其資料分片內的唯一記錄 identifier。 |
範例
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
現在需要從 testing.streaming_table
查詢數據,以便進一步分析。
錯誤的查詢:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);