Jaa


read_pulsar streaming table-valued function

Applies to: check marked yes Databricks SQL check marked yes Databricks Runtime 14.1 and above

Important

This feature is in Public Preview.

Returns a table with records read from Pulsar.

This table-valued function only supports streaming and not batch query.

Syntax

read_pulsar ( { option_key => option_value } [, ...] )

Arguments

This function requires named parameter invocation for the option keys.

The options serviceUrl and topic are mandatory.

The descriptions of the arguments are brief here. See structured streaming Pulsar documentation for extended descriptions.

Option Type Default Description
serviceUrl STRING Mandatory The URI of the Pulsar service.
topic STRING Mandatory The topic to read from.
predefinedSubscription STRING None The predefined subscription name used by the connector to track spark application progress.
subscriptionPrefix STRING None A prefix used by the connector to generate a random subscription to track spark application progress.
pollTimeoutMs LONG 120000 The timeout for reading messages from Pulsar in milliseconds.
failOnDataLoss BOOLEAN true Controls whether to fail a query when data is lost (for example, topics are deleted, or messages are deleted because of retention policy).
startingOffsets STRING latest The start point when a query is started, either earliest, latest, or a JSON string that specifies a specific offset. If latest, the reader reads the newest records after it starts running. If earliest, the reader reads from the earliest offset. The user can also specify a JSON string that specifies a specific offset.
startingTime STRING None When specified, Pulsar source will read messages starting from the position of the specified startingTime.

The following arguments are used for authentication of the pulsar client:

Option Type Default Description
pulsarClientAuthPluginClassName STRING None Name of the authentication plugin.
pulsarClientAuthParams STRING None Parameters for the authentication plugin.
pulsarClientUseKeyStoreTls STRING None Whether to use KeyStore for tls authentication.
pulsarClientTlsTrustStoreType STRING None TrustStore file type for tls authentication.
pulsarClientTlsTrustStorePath STRING None TrustStore file path for tls authentication.
pulsarClientTlsTrustStorePassword STRING None TrustStore password for tls authentication.

These arguments are used for configuration and authentication of pulsar admission control, pulsar admin configuration is only required when admission control is enabled(when maxBytesPerTrigger is set)

Option Type Default Description
maxBytesPerTrigger BIGINT None A soft limit of the maximum number of bytes we want to process per microbatch. If this is specified, admin.url also needs to be specified.
adminUrl STRING None The Pulsar serviceHttpUrl configuration. Only needed when maxBytesPerTrigger is specified.
pulsarAdminAuthPlugin STRING None Name of the authentication plugin.
pulsarAdminAuthParams STRING None Parameters for the authentication plugin.
pulsarClientUseKeyStoreTls STRING None Whether to use KeyStore for tls authentication.
pulsarAdminTlsTrustStoreType STRING None TrustStore file type for tls authentication.
pulsarAdminTlsTrustStorePath STRING None TrustStore file path for tls authentication.
pulsarAdminTlsTrustStorePassword STRING None TrustStore password for tls authentication.

Returns

A table of pulsar records with the following schema.

  • __key STRING NOT NULL: Pulsar message key.

  • value BINARY NOT NULL: Pulsar message value.

    Note: For topics with Avro or JSON schema, instead of loading content into a binary value field, the content will be expanded to preserve the field names and field types of the Pulsar topic.

  • __topic STRING NOT NULL: Pulsar topic name.

  • __messageId BINARY NOT NULL: Pulsar message id.

  • __publishTime TIMESTAMP NOT NULL: Pulsar message publish time.

  • __eventTime TIMESTAMP NOT NULL: Pulsar message event time.

  • __messageProperties MAP<STRING, STRING>: Pulsar message properties.

Examples

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.