read_pubsub
串流數據表值函式
適用於: Databricks SQL Databricks Runtime 13.3 LTS 和更新版本
傳回數據表,其中包含從主題讀取自 Pub/Sub 的記錄。 僅支援串流查詢。
語法
read_pubsub( { parameter => value } [, ...])
引數
read_pubsub
需要具名參數調用。
唯一必要的自變數是 subscriptionId
、 projectId
和 topicId
。 所有其他自變數都是選擇性的。
如需完整的自變數描述,請參閱 設定發佈/子串流讀取的選項。
Databricks 建議在提供授權選項時使用秘密。 請參閱 秘密函式。
如需設定 Pub/Sub 存取權的詳細資訊,請參閱 設定 Pub/Sub 的存取權。
參數 | 類型 | 描述 |
---|---|---|
subscriptionId |
STRING |
必要,指派給 Pub/Sub 訂用帳戶的唯一標識碼。 |
projectId |
STRING |
必要專案標識碼,與 Pub/Sub 主題相關聯的 Google Cloud 項目識別碼。 |
topicId |
STRING |
必要,要訂閱的 Pub/Sub 主題識別碼或名稱。 |
clientEmail |
STRING |
與服務帳戶相關聯的電子郵件地址進行驗證。 |
clientId |
STRING |
與服務帳戶相關聯的用戶端標識碼以進行驗證。 |
privateKeyId |
STRING |
與服務帳戶相關聯的私鑰標識碼。 |
privateKey |
STRING |
與服務帳戶相關聯的私鑰進行驗證。 |
從 Pub/Sub 讀取時,這些自變數會用於進一步微調:
參數 | 類型 | 描述 |
---|---|---|
numFetchPartitions |
STRING |
選擇性,具有預設的執行程序數目。 從訂用帳戶擷取記錄的平行 Spark 工作數目。 |
deleteSubscriptionOnStreamStop |
BOOLEAN |
選擇性,預設 false 為 。 如果設定為 true,當串流作業結束時,會刪除傳遞至數據流的訂用帳戶。 |
maxBytesPerTrigger |
STRING |
每個觸發的微批次期間,要處理的批次大小軟性限制。 預設值為 『none』。 |
maxRecordsPerFetch |
STRING |
處理記錄之前要擷取每個工作的記錄數目。 預設值為 『1000』。 |
maxFetchPeriod |
STRING |
處理記錄之前,每個工作要擷取的時間持續時間。 預設值為 『10s』。 |
傳回
具有下列架構的 Pub/Sub 記錄數據表。 屬性數據行可以是 Null,但所有其他數據行都不是 Null。
名稱 | 資料類型 | Nullable | 標準 | 描述 |
---|---|---|---|---|
messageId |
STRING |
No | Pub/Sub 訊息的唯一標識符。 | |
payload |
BINARY |
No | Pub/Sub 訊息的內容。 | |
attributes |
STRING |
Yes | 代表 Pub/Sub 訊息屬性的索引鍵/值組。 這是 json 編碼的字串。 | |
publishTimestampInMillis |
BIGINT |
No | 訊息發佈時的時間戳,以毫秒為單位。 | |
sequenceNumber |
BIGINT |
No | 其分區內記錄的唯一標識碼。 |
範例
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
現在需要從 testing.streaming_table
查詢數據,以便進一步分析。
錯誤的查詢:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);