read_kinesis
ストリーミングテーブル値関数
適用対象: Databricks SQL Databricks Runtime 13.3 LTS 以上
1 つ以上のストリームの Kinesis から読み取られたレコードを含むテーブルを返します。
構文
read_kinesis ( { parameter => value } [, ...] )
引数
read_kinesis
には、名前付きパラメーター呼び出しが必要です。
唯一必要な引数は streamName
です。 他の引数はすべて省略可能です。
ここでは引数について簡単に説明します。 詳細については、Amazon Kinesis のドキュメント を参照してください。
AWS に接続して認証するための接続オプションにはさまざまなものがあります。
awsAccessKey
、および awsSecretKey
は、secret 関数を使用して関数の引数で指定するか、引数を使用して手動で設定するか、次に示すように環境変数として構成できます。
roleArn
、roleExternalID
、roleSessionName
は、インスタンス プロファイルを使用して AWS で認証するときに使用することもできます。
どれも指定されていない場合は、既定の AWS プロバイダー チェーンが使用されます。
パラメーター | 型 | 説明 |
---|---|---|
streamName |
STRING |
必須。1 つ以上の kinesis ストリームのコンマ区切りのリスト。 |
awsAccessKey |
STRING |
AWS アクセス キー (存在する場合)。 環境変数 (AWS_ACCESS_KEY_ID ) や資格情報プロファイル ファイルなど、AWS の既定の資格情報プロバイダー チェーンでサポートされているさまざまなオプションを使用して指定することもできます。 |
awsSecretKey |
STRING |
アクセス キーに対応する秘密鍵。 引数で指定するか、環境変数 (AWS_SECRET_KEY または AWS_SECRET_ACCESS_KEY ) や資格情報プロファイル ファイルなど、AWS の既定の資格情報プロバイダー チェーンでサポートされているさまざまなオプションを使用して指定することができます。 |
roleArn |
STRING |
Kinesis にアクセスするときに想定されるロールの Amazon リソース名。 |
roleExternalId |
STRING |
AWS アカウントへのアクセスを委任するときに使用されます。 |
roleSessionName |
STRING |
AWS ロールセッション名。 |
stsEndpoint |
STRING |
一時的なアクセス資格情報を要求するためのエンドポイント。 |
region |
STRING |
指定するストリームのリージョン。 既定値は、ローカルに解決されたリージョンです。 |
endpoint |
STRING |
Kinesis データ ストリームのリージョン エンドポイント。 既定値は、ローカルに解決されたリージョンです。 |
initialPosition |
STRING |
ストリーム内からの読み取りの開始場所。 次のいずれか: 'latest' (既定値)、'trim_horizon'、'earliest'、'at_timestamp'。 |
consumerMode |
STRING |
次のいずれか: 'polling' (既定値)、または 'EFO' (enhanced-fan-out)。 |
consumerName |
STRING |
コンシューマーの名前。 すべてのコンシューマーにプレフィックス 'databricks_' が付加されます。 既定値は空の文字列です。 |
registerConsumerTimeoutInterval |
STRING |
Kinesis EFO コンシューマーが Kinesis ストリームに登録されるのを待つときの最大タイムアウト。この時間が経過した後、エラーがスローされます。 既定値は '300s' です。 |
requireConsumerDeregistration |
BOOLEAN |
true の場合、クエリの終了時に EFO コンシューマーが登録解除されます。 既定値は false です。 |
deregisterConsumerTimeoutInterval |
STRING |
Kinesis EFO コンシューマーが Kinesis ストリームから登録解除されるのを待つときの最大タイムアウト。この時間が経過した後、エラーがスローされます。 既定値は '300s' です。 |
consumerRefreshInterval |
STRING |
コンシューマーが確認および更新される間隔。 既定値は '300s' です。 |
次の引数は、Kinesis の読み取りスループットと待機時間の制御に使用されます。
パラメーター | 型 | 説明 |
---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
省略可能。Kinesis への API 要求ごとに読み取られるレコード数 (既定では 10,000)。 |
maxFetchRate |
STRING |
シャードごとにデータをプリフェッチする速度。 MB/s で測定された '1.0' と '2.0' の間の値。 既定値は '1.0' です。 |
minFetchPeriod |
STRING |
連続するプリフェッチ試行間の最大待機時間。 既定値は '400ms' です。 |
maxFetchDuration |
STRING |
プリフェッチされた新しいデータをバッファーする最長期間。 既定値は '10s' です。 |
fetchBufferSize |
STRING |
次のトリガーのデータ量。 既定値は '20gb' です。 |
shardsPerTask |
INTEGER (>0) |
Spark タスクごとに並行でプリフェッチする Kinesis シャードの数。 既定値は 5 です。 |
shardFetchinterval |
STRING |
再シャード化のためのポーリング頻度。 既定値は '1s' です。 |
coalesceThresholdBlockSize |
INTEGER (>0) |
自動結合が発生するしきい値。 既定値は 10,000,000 です。 |
coalesce |
BOOLEAN |
true の場合、プリフェッチされた要求が結合されます。 既定値は、true です。 |
coalesceBinSize |
INTEGER (>0) |
結合後の概算ブロック サイズ。 既定値は 128,000,000 です。 |
reuseKinesisClient |
BOOLEAN |
true の場合、キャッシュに格納されている Kinesis クライアントが再利用されます。 既定値は true です (PE クラスターを除く)。 |
clientRetries |
INTEGER (>0) |
再試行シナリオでの再試行回数。 既定値は 5 です。 |
返品
次のスキーマを持つ Kinesis レコードのテーブル。
名前 | データ型 | Nullable | Standard | 説明 |
---|---|---|---|---|
partitionKey |
STRING |
いいえ | ストリームのシャード間でデータを分散するときに使用されるキー。 パーティション キーが同じデータ レコードはすべて、同じシャードから読み取られます。 | |
data |
BINARY |
いいえ | kinesis データ ペイロード、base-64 エンコード済み。 | |
stream |
STRING |
いいえ | データ読み取り元のストリームの名前。 | |
shardId |
STRING |
いいえ | データ読み取り元のシャードの一意識別子。 | |
sequenceNumber |
BIGINT |
いいえ | シャード内のレコードの一意識別子。 | |
approximateArrivalTimestamp |
TIMESTAMP |
いいえ | ストリームへのおおよそのレコード挿入時間。 |
(stream, shardId, sequenceNumber)
列が主キーを構成します。
例
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');