共用方式為


read_pubsub 串流 table-valued 函式

適用於:核取記號為「是」 Databricks SQL 核取記號為「是」 Databricks Runtime 13.3 LTS 和更新版本

傳回一個 table,其中包含從 Pub/Sub 中某主題讀取的記錄。 僅支援串流查詢。

語法

read_pubsub( { parameter => value } [, ...])

引數

read_pubsub 需要具名參數調用

唯一必要的自變數是 subscriptionIdprojectIdtopicId。 所有其他自變數都是選擇性的。

如需完整的自變數描述,請參閱 設定發佈/子串流讀取的選項。

Databricks 建議在提供授權選項時使用秘密。 請參閱 秘密函式

如需設定 Pub/Sub 存取權的詳細資訊,請參閱 設定 Pub/Sub 的存取權。

參數 類型 描述
subscriptionId STRING 必要,指派給 Pub/Sub 訂閱的唯一識別碼 identifier。
projectId STRING 必要專案標識碼,與 Pub/Sub 主題相關聯的 Google Cloud 項目識別碼。
topicId STRING 必要,要訂閱的 Pub/Sub 主題識別碼或名稱。
clientEmail STRING 與服務帳戶相關聯的電子郵件地址進行驗證。
clientId STRING 與服務帳戶相關聯的用戶端標識碼以進行驗證。
privateKeyId STRING 與服務帳戶相關聯的私鑰標識碼。
privateKey STRING 與服務帳戶相關聯的私鑰進行驗證。

從 Pub/Sub 讀取時,這些自變數會用於進一步微調:

參數 類型 描述
numFetchPartitions STRING 選擇性,具有預設的執行程序數目。 從訂用帳戶擷取記錄的平行 Spark 工作數目。
deleteSubscriptionOnStreamStop BOOLEAN 選擇性,預設 false為 。 如果 set 為 true,當串流作業結束時,會刪除傳遞至串流的訂閱。
maxBytesPerTrigger STRING 每當觸發微批次時,需處理的批次大小設為軟性 limit。 預設值為 『none』。
maxRecordsPerFetch STRING 處理記錄之前要擷取每個工作的記錄數目。 預設值為 『1000』。
maxFetchPeriod STRING 處理記錄之前,每個工作要擷取的時間持續時間。 預設值為 『10s』。

傳回

具有下列 schema的 Pub/Sub 記錄 table。 column 的屬性可以是空值,但所有其他 columns 的屬性都不能是空值。

名稱 資料類型 Nullable 標準 描述
messageId STRING No Pub/Sub 訊息的「唯一 identifier」。
payload BINARY No Pub/Sub 訊息的內容。
attributes STRING Yes 代表 Pub/Sub 訊息屬性的索引鍵/值組。 這是 json 編碼的字串。
publishTimestampInMillis BIGINT No 訊息發佈時的時間戳,以毫秒為單位。
sequenceNumber BIGINT No 在其資料分片內的唯一記錄 identifier。

範例

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

現在需要從 testing.streaming_table 查詢數據,以便進一步分析。

錯誤的查詢:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);