read_pubsub
streaming table-valued function
Applies to: Databricks SQL Databricks Runtime 13.3 LTS and above
Returns a table with records read from Pub/Sub from a topic. Only supports streaming queries.
Syntax
read_pubsub( { parameter => value } [, ...])
Arguments
read_pubsub
requires named parameter invocation.
The only required arguments are subscriptionId
, projectId
, and topicId
. All other arguments are optional.
For full argument descriptions, see Configure options for Pub/Sub streaming read.
Databricks recommends using secrets when providing authorization options. See secret function.
For details on configuring access to Pub/Sub, see Configure access to Pub/Sub.
Parameter | Type | Description |
---|---|---|
subscriptionId |
STRING |
Required, the unique identifier assigned to a Pub/Sub subscription. |
projectId |
STRING |
Required, the Google Cloud project ID associated with the Pub/Sub topic. |
topicId |
STRING |
Required, the ID or name of the Pub/Sub topic to subscribe to. |
clientEmail |
STRING |
The email address associated with a service account for authentication. |
clientId |
STRING |
The client ID associated with the service account for authentication. |
privateKeyId |
STRING |
The ID of the private key associated with the service account. |
privateKey |
STRING |
The private key associated with the service account for authentication. |
These arguments are used for further fine-tuning when reading from Pub/Sub:
Parameter | Type | Description |
---|---|---|
numFetchPartitions |
STRING |
Optional with default number of executors. The number of parallel Spark tasks that fetch records from a subscription. |
deleteSubscriptionOnStreamStop |
BOOLEAN |
Optional with default false . If set to true, the subscription passed to the stream is deleted when the streaming job ends. |
maxBytesPerTrigger |
STRING |
A soft limit for the batch size to be processed during each triggered micro-batch. The default is ‘none’. |
maxRecordsPerFetch |
STRING |
The number of records to fetch per task before processing records. The default is ‘1000’. |
maxFetchPeriod |
STRING |
The time duration for each task to fetch before processing records. The default is ’10s’. |
Returns
A table of Pub/Sub records with the following schema. The attributes column could be null but all other columns are not null.
Name | Data type | Nullable | Standard | Description |
---|---|---|---|---|
messageId |
STRING |
No | Unique identifier for the Pub/Sub message. | |
payload |
BINARY |
No | The content of the Pub/Sub message. | |
attributes |
STRING |
Yes | Key-value pairs representing the attributes of the Pub/Sub message. This is a json-encoded string. | |
publishTimestampInMillis |
BIGINT |
No | The timestamp when the message was published, in milliseconds. | |
sequenceNumber |
BIGINT |
No | The unique identifier of the record within its shard. |
Examples
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
The data would now need to be queried from the testing.streaming_table
for further analysis.
Erroneous queries:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);