共用方式為


read_pubsub 串流數據表值函式

適用於:核取記號為「是」 Databricks SQL 核取記號為「是」 Databricks Runtime 13.3 LTS 和更新版本

傳回數據表,其中包含從主題讀取自 Pub/Sub 的記錄。 僅支援串流查詢。

語法

read_pubsub( { parameter => value } [, ...])

引數

read_pubsub需要具名參數調用

唯一必要的自變數是 subscriptionIdprojectIdtopicId。 所有其他自變數都是選擇性的。

如需完整的自變數描述,請參閱 設定發佈/子串流讀取的選項。

Databricks 建議在提供授權選項時使用秘密。 請參閱 秘密函式

如需設定 Pub/Sub 存取權的詳細資訊,請參閱 設定 Pub/Sub 的存取權。

參數 類型 描述
subscriptionId STRING 必要,指派給 Pub/Sub 訂用帳戶的唯一標識碼。
projectId STRING 必要專案標識碼,與 Pub/Sub 主題相關聯的 Google Cloud 項目識別碼。
topicId STRING 必要,要訂閱的 Pub/Sub 主題識別碼或名稱。
clientEmail STRING 與服務帳戶相關聯的電子郵件地址進行驗證。
clientId STRING 與服務帳戶相關聯的用戶端標識碼以進行驗證。
privateKeyId STRING 與服務帳戶相關聯的私鑰標識碼。
privateKey STRING 與服務帳戶相關聯的私鑰進行驗證。

從 Pub/Sub 讀取時,這些自變數會用於進一步微調:

參數 類型 描述
numFetchPartitions STRING 選擇性,具有預設的執行程序數目。 從訂用帳戶擷取記錄的平行 Spark 工作數目。
deleteSubscriptionOnStreamStop BOOLEAN 選擇性,預設 false為 。 如果設定為 true,當串流作業結束時,會刪除傳遞至數據流的訂用帳戶。
maxBytesPerTrigger STRING 每個觸發的微批次期間,要處理的批次大小軟性限制。 預設值為 『none』。
maxRecordsPerFetch STRING 處理記錄之前要擷取每個工作的記錄數目。 預設值為 『1000』。
maxFetchPeriod STRING 處理記錄之前,每個工作要擷取的時間持續時間。 預設值為 『10s』。

傳回

具有下列架構的 Pub/Sub 記錄數據表。 屬性數據行可以是 Null,但所有其他數據行都不是 Null。

名稱 資料類型 Nullable 標準 描述
messageId STRING No Pub/Sub 訊息的唯一標識符。
payload BINARY No Pub/Sub 訊息的內容。
attributes STRING Yes 代表 Pub/Sub 訊息屬性的索引鍵/值組。 這是 json 編碼的字串。
publishTimestampInMillis BIGINT No 訊息發佈時的時間戳,以毫秒為單位。
sequenceNumber BIGINT No 其分區內記錄的唯一標識碼。

範例

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

現在需要從 testing.streaming_table 查詢數據,以便進一步分析。

錯誤的查詢:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);