read_pulsar
ストリーミングテーブル値関数
適用対象: Databricks SQL Databricks Runtime 14.1 以降
重要
この機能はパブリック プレビュー段階にあります。
Pulsar から読み取られたレコードを含むテーブルを返します。
このテーブル値関数はストリーミングのみをサポートし、バッチ クエリはサポートしません。
構文
read_pulsar ( { option_key => option_value } [, ...] )
引数
この関数には、オプション キーの名前付きパラメーター呼び出しが必要です。
オプション serviceUrl
と topic
は必須です。
ここでは引数について簡単に説明します。 詳細な説明については、構造化ストリーミング Pulsar のドキュメントを参照してください。
オプション | Type | Default | 説明 |
---|---|---|---|
serviceUrl | STRING | Mandatory | Pulsar サービスの URI。 |
topic | STRING | Mandatory | 読み取るトピック。 |
predefinedSubscription | STRING | なし | Spark アプリケーションの進行状況を追跡するためにコネクタによって使用される、定義済みのサブスクリプション名。 |
subscriptionPrefix | STRING | なし | Spark アプリケーションの進行状況を追跡するランダムなサブスクリプションを生成するために、コネクタによって使用されるプレフィックス。 |
pollTimeoutMs | LONG | 120000 | Pulsar からメッセージを読み取る際のタイムアウト (ミリ秒単位)。 |
failOnDataLoss | BOOLEAN | true | データが失われたとき (たとえば、トピックが削除された場合や、アイテム保持ポリシーのためにメッセージが削除された場合) にクエリを失敗させるかどうかを制御します。 |
startingOffsets | STRING | latest | クエリが開始されるときの開始点。earliest、earliest、または特定のオフセットを指定する JSON 文字列のいずれかです。 latest の場合、リーダーは実行を開始した後に最新のレコードを読み取ります。 earliest の場合、リーダーは最も早いオフセットから読み取ります。 ユーザーは、特定のオフセットを指定する JSON 文字列を指定することもできます。 |
startingTime | STRING | なし | 指定すると、Pulsar ソースは指定された startingTime の位置からメッセージを読み取ります。 |
次の引数は、pulsar クライアントの認証に使われます。
オプション | Type | Default | 説明 |
---|---|---|---|
pulsarClientAuthPluginClassName | STRING | なし | 認証プラグインの名前。 |
pulsarClientAuthParams | STRING | なし | 認証プラグインのパラメーター。 |
pulsarClientUseKeyStoreTls | STRING | なし | tls 認証に KeyStore を使うかどうか。 |
pulsarClientTlsTrustStoreType | STRING | なし | tls 認証の TrustStore ファイルの種類。 |
pulsarClientTlsTrustStorePath | STRING | なし | tls 認証の TrustStore ファイル パス。 |
pulsarClientTlsTrustStorePassword | STRING | なし | tls 認証の TrustStore パスワード。 |
これらの引数は、Pulsar アドミッション コントロールの設定と認証に使われます。Pulsar 管理設定は、受付制御が有効な場合 (maxBytesPerTrigger が設定されている場合) にのみ必要です
オプション | Type | Default | 説明 |
---|---|---|---|
maxBytesPerTrigger | BIGINT | なし | マイクロバッチごとに処理する最大バイト数のソフト制限。 これを指定する場合は、admin.url も指定する必要があります。 |
adminUrl | STRING | なし | Pulsar serviceHttpUrl の構成。 maxBytesPerTrigger が指定されている場合にのみ必要です。 |
pulsarAdminAuthPlugin | STRING | なし | 認証プラグインの名前。 |
pulsarAdminAuthParams | STRING | なし | 認証プラグインのパラメーター。 |
pulsarClientUseKeyStoreTls | STRING | なし | tls 認証に KeyStore を使うかどうか。 |
pulsarAdminTlsTrustStoreType | STRING | なし | tls 認証の TrustStore ファイルの種類。 |
pulsarAdminTlsTrustStorePath | STRING | なし | tls 認証の TrustStore ファイル パス。 |
pulsarAdminTlsTrustStorePassword | STRING | なし | tls 認証の TrustStore パスワード。 |
返品
次のスキーマを持つ Pulsar レコードのテーブル。
__key STRING NOT NULL
: Pulsar メッセージ キー。value BINARY NOT NULL
: Pulsar メッセージ値。注: Avro または JSON スキーマを含むトピックの場合、コンテンツをバイナリ値フィールドに読み込む代わりに、Pulsar トピックのフィールド名とフィールドの種類を保持するためにコンテンツが展開されます。
__topic STRING NOT NULL
: Pulsar トピック名。__messageId BINARY NOT NULL
: Pulsar メッセージ ID。__publishTime TIMESTAMP NOT NULL
: Pulsar メッセージの発行時刻。__eventTime TIMESTAMP NOT NULL
: Pulsar メッセージのイベント時刻。__messageProperties MAP<STRING, STRING>
: Pulsar メッセージのプロパティ。
例
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.