次の方法で共有


read_pulsar ストリーミングテーブル値関数

適用対象: check marked yes Databricks SQL 「はい」のチェック マーク Databricks Runtime 14.1 以降

重要

この機能はパブリック プレビュー段階にあります。

Pulsar から読み取られたレコードを含むテーブルを返します。

このテーブル値関数はストリーミングのみをサポートし、バッチ クエリはサポートしません。

構文

read_pulsar ( { option_key => option_value } [, ...] )

引数

この関数には、オプション キーの名前付きパラメーター呼び出しが必要です。

オプション serviceUrltopic は必須です。

ここでは引数について簡単に説明します。 詳細な説明については、構造化ストリーミング Pulsar のドキュメントを参照してください。

オプション Type Default 説明
serviceUrl STRING Mandatory Pulsar サービスの URI。
topic STRING Mandatory 読み取るトピック。
predefinedSubscription STRING なし Spark アプリケーションの進行状況を追跡するためにコネクタによって使用される、定義済みのサブスクリプション名。
subscriptionPrefix STRING なし Spark アプリケーションの進行状況を追跡するランダムなサブスクリプションを生成するために、コネクタによって使用されるプレフィックス。
pollTimeoutMs LONG 120000 Pulsar からメッセージを読み取る際のタイムアウト (ミリ秒単位)。
failOnDataLoss BOOLEAN true データが失われたとき (たとえば、トピックが削除された場合や、アイテム保持ポリシーのためにメッセージが削除された場合) にクエリを失敗させるかどうかを制御します。
startingOffsets STRING latest クエリが開始されるときの開始点。earliest、earliest、または特定のオフセットを指定する JSON 文字列のいずれかです。 latest の場合、リーダーは実行を開始した後に最新のレコードを読み取ります。 earliest の場合、リーダーは最も早いオフセットから読み取ります。 ユーザーは、特定のオフセットを指定する JSON 文字列を指定することもできます。
startingTime STRING なし 指定すると、Pulsar ソースは指定された startingTime の位置からメッセージを読み取ります。

次の引数は、pulsar クライアントの認証に使われます。

オプション Type Default 説明
pulsarClientAuthPluginClassName STRING なし 認証プラグインの名前。
pulsarClientAuthParams STRING なし 認証プラグインのパラメーター。
pulsarClientUseKeyStoreTls STRING なし tls 認証に KeyStore を使うかどうか。
pulsarClientTlsTrustStoreType STRING なし tls 認証の TrustStore ファイルの種類。
pulsarClientTlsTrustStorePath STRING なし tls 認証の TrustStore ファイル パス。
pulsarClientTlsTrustStorePassword STRING なし tls 認証の TrustStore パスワード。

これらの引数は、Pulsar アドミッション コントロールの設定と認証に使われます。Pulsar 管理設定は、受付制御が有効な場合 (maxBytesPerTrigger が設定されている場合) にのみ必要です

オプション Type Default 説明
maxBytesPerTrigger BIGINT なし マイクロバッチごとに処理する最大バイト数のソフト制限。 これを指定する場合は、admin.url も指定する必要があります。
adminUrl STRING なし Pulsar serviceHttpUrl の構成。 maxBytesPerTrigger が指定されている場合にのみ必要です。
pulsarAdminAuthPlugin STRING なし 認証プラグインの名前。
pulsarAdminAuthParams STRING なし 認証プラグインのパラメーター。
pulsarClientUseKeyStoreTls STRING なし tls 認証に KeyStore を使うかどうか。
pulsarAdminTlsTrustStoreType STRING なし tls 認証の TrustStore ファイルの種類。
pulsarAdminTlsTrustStorePath STRING なし tls 認証の TrustStore ファイル パス。
pulsarAdminTlsTrustStorePassword STRING なし tls 認証の TrustStore パスワード。

返品

次のスキーマを持つ Pulsar レコードのテーブル。

  • __key STRING NOT NULL: Pulsar メッセージ キー。

  • value BINARY NOT NULL: Pulsar メッセージ値。

    注: Avro または JSON スキーマを含むトピックの場合、コンテンツをバイナリ値フィールドに読み込む代わりに、Pulsar トピックのフィールド名とフィールドの種類を保持するためにコンテンツが展開されます。

  • __topic STRING NOT NULL: Pulsar トピック名。

  • __messageId BINARY NOT NULL: Pulsar メッセージ ID。

  • __publishTime TIMESTAMP NOT NULL: Pulsar メッセージの発行時刻。

  • __eventTime TIMESTAMP NOT NULL: Pulsar メッセージのイベント時刻。

  • __messageProperties MAP<STRING, STRING>: Pulsar メッセージのプロパティ。

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.