Databricks SQL でストリーミング テーブルを使用してデータを読み込む
Databricks では、Databricks SQL を使用してデータを取り込むために、ストリーミング テーブルを使用することをお勧めします。 ストリーミング テーブルとは、ストリーミングまたは増分データ処理を追加でサポートする Unity Catalog に登録されたテーブルのことです。 Delta Live Tables パイプラインは、ストリーミング テーブルごとに自動的に作成されます。 ストリーミング テーブルを使用して、Kafka とクラウド オブジェクト ストレージから増分データ読み込みを行うことができます。
この記事では、Unity Catalog ボリューム (推奨) または外部の場所として構成されたクラウド オブジェクト ストレージからデータを読み込むために、ストリーミング テーブルを使用することについて説明します。
Note
Delta Lake テーブルをストリーミング ソースおよびシンクとして使用する方法については、「Delta テーブルのストリーミング読み取りと書き込み」を参照してください。
重要
Databricks SQL で作成されたストリーミング テーブルは、サーバーレス Delta Live Tables パイプラインによってサポートされます。 この機能を使用するには、ワークスペースでサーバーレス パイプラインをサポートする必要があります。
始める前に
操作を始める前に、以下の要件を満たしておく必要があります。
ワークスペースの要件:
- サーバーレスが有効になっている Azure Databricks アカウント。 詳細については、「サーバーレス SQL ウェアハウスを有効にする」を参照してください。
- Unity Catalog が有効になっているワークスペース。 詳細については、「Unity Catalog の設定と管理」を参照してください。
コンピューティングの要件:
次のいずれかを使用する必要があります。
Current
チャネルを使用する SQL ウェアハウス。Databricks Runtime 13.3 LTS 以降で共有アクセス モードを使用したコンピューティング。
Databricks Runtime 15.4 LTS 以降のシングル ユーザー アクセス モードを使用したコンピューティング。
Databricks Runtime 15.3 以下では、シングル ユーザー コンピューティングを使用して、他のユーザーが所有するストリーミング テーブルに対してクエリ 実行することはできません。 Databricks Runtime 15.3 以下では、ストリーミング テーブルを所有している場合にのみ、シングル ユーザー コンピューティングを使用できます。 テーブルの作成者が所有者です。
Databricks Runtime 15.4 LTS 以降では、テーブルの所有権に関係なく、単一ユーザー コンピューティングで Delta Live テーブルによって生成されたテーブルに対するクエリがサポートされます。 Databricks Runtime 15.4 LTS 以降で提供されるデータ フィルター処理を利用するには、"ワークスペースでサーバーレス コンピューティングが有効になっている" ことを確認する必要があります。Delta Live Tables で生成されたテーブルをサポートするデータ フィルター処理は、サーバーレス コンピューティングで実行されるためです。 シングル ユーザー コンピューティングを使用してデータ フィルター処理操作を実行すると、サーバーレス コンピューティング リソースに対して課金される可能性があります。 シングル ユーザー コンピューティングでの詳細なアクセス制御を参照してください。
アクセス許可の要件:
- Unity Catalog の外部の場所での
READ FILES
特権。 情報については、「クラウド ストレージを Azure Databricks に接続するための外部の場所を作成する」を参照してください。 - ストリーミング テーブルを作成するカタログでの
USE CATALOG
特権。 - ストリーミング テーブルを作成するスキーマでの
USE SCHEMA
特権。 - ストリーミング テーブルを作成するスキーマでの
CREATE TABLE
特権。
その他の要件:
ソース データへのパス。
ボリューム パスの例:
/Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>
外部の場所のパスの例:
abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis
Note
この記事では、読み込むデータが、Unity Catalog ボリュームまたはアクセスできる外部の場所に対応するクラウドの保存場所にあることを前提としています。
ソース データの検出とプレビュー
ワークスペースのサイドバーで、[クエリ] をクリックし、[クエリの作成] をクリックします。
クエリ エディターで、ドロップダウン リストから
Current
チャネルを使用する SQL ウェアハウスを選択します。次のコマンドをエディターに貼り付け、ソース データを識別する情報を山かっこ (
<>
) に置き換え、[実行] をクリックします。Note
関数の既定値でデータを解析できない場合は、
read_files
テーブル値関数の実行時にスキーマ推論エラーが発生する可能性があります。 たとえば、複数行の CSV ファイルまたは JSON ファイルに対して複数行モードを構成する必要がある場合があります。 パーサー オプションの一覧については、「read_files テーブル値関数」を参照してください。/* Discover your data in a volume */ LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>" /* Preview your data in a volume */ SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10 /* Discover your data in an external location */ LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>" /* Preview your data */ SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
ストリーミング テーブルにデータを読み込む
クラウド オブジェクト ストレージのデータからストリーミング テーブルを作成するには、クエリ エディターに次のコマンドを貼り付けて、[実行] をクリックします。
/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')
/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')
ランタイム チャネルを設定する
SQL ウェアハウスを使用して作成されたストリーミング テーブルは、Delta Live Tables パイプラインを使用して自動的に更新されます。 Delta Live Tables パイプラインでは、既定で current
チャネルのランタイムが使用されます。 リリース プロセスの詳細については、 Delta Live Tables のリリース ノートとリリース アップグレード プロセス を参照してください。
Databricks では、運用ワークロードに current
チャネルを使用することをお勧めします。 新機能は、最初に preview
チャネルにリリースされます。 パイプラインをプレビュー Delta Live Tables チャネルに設定し、テーブル プロパティとして preview
を指定することで、新機能をテストできます。 このプロパティは、テーブルを作成するとき、または ALTER ステートメントを使用してテーブルを作成した後に指定できます。
次のコード例は、CREATE ステートメントでチャネルをプレビューに設定する方法を示しています。
CREATE OR REPLACE MATERIALIZED VIEW foo.default.bar
TBLPROPERTIES ('pipelines.channel' = 'preview') as
SELECT
*
FROM
range(5)
## <a id="refresh"></a> Refresh a <st> using a DLT pipeline
This section describes patterns for refreshing a <st> with the latest data available from the sources defined in the query.
When you `CREATE` or `REFRESH` a <st>, the update processes using a serverless <DLT> pipeline. Each <st> you define has an associated <DLT> pipeline.
After you run the `REFRESH` command, the DLT pipeline link is returned. You can use the DLT pipeline link to check the status of the refresh.
.. note:: Only the table owner can refresh a <st> to get the latest data. The user that creates the table is the owner, and the owner can't be changed. You might need to refresh your <st> before using [time travel](/delta/history.md#time-travel) queries.
See [_](/delta-live-tables/index.md).
### Ingest new data only
By default, the `read_files` function reads all existing data in the source directory during table creation, and then processes newly arriving records with each refresh.
To avoid ingesting data that already exists in the source directory at the time of table creation, set the `includeExistingFiles` option to `false`. This means that only data that arrives in the directory after table creation is processed. For example:
.. azure::
```sql
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
includeExistingFiles => false)
ストリーミング テーブルを完全に更新する
完全更新では、最新の定義を使用して、ソースで使用可能なすべてのデータが再処理されます。 完全更新では既存のデータが切り詰められるため、データの履歴全体を保持しないソースや、Kafka など、保持期間が短いソースの場合、完全更新の呼び出しは推奨されません。 ソースでデータが使用できなくなった場合、古いデータを回復できないことがあります。
次に例を示します。
REFRESH STREAMING TABLE my_bronze_table FULL
自動更新するストリーミング テーブルをスケジュールする
定義されたスケジュールに基づいて自動的に更新されるようにストリーミング テーブルを構成するには、クエリ エディターに次のコマンドを貼り付けて、[実行] をクリックします。
ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
CRON '<cron-string>'
[ AT TIME ZONE '<timezone-id>' ]];
更新スケジュール クエリの例については、「代替ストリーミング テーブル」を参照してください。
更新の状態を追跡する
Delta Live Tables UI でストリーミング テーブルを管理するパイプラインを表示するか、ストリーミング テーブルの DESCRIBE EXTENDED
コマンドによって返される更新情報を表示することで、ストリーミング テーブルの更新の状態を表示できます。
DESCRIBE EXTENDED <table-name>
Kafka からのストリーミング インジェスト
Kafka からのストリーミング インジェストの例については、「read_kafka」を参照してください。
ストリーミング テーブルへのアクセス権をユーザーに付与する
ストリーミング テーブルに対する SELECT
権限をユーザーに付与してクエリを実行できるようにするには、次のコマンドをクエリ エディターに貼り付けて、[実行] をクリックします。
GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>
Unity Catalog のセキュリティ保護可能なオブジェクトの付与に関する詳細については、「Unity カタログの特権とセキュリティ保護可能なオブジェクト」を参照してください。
クエリ履歴を使用して実行を監視する
クエリ履歴ページを使用すると、ストリーミング テーブルの更新を実行するために使用される Delta Live Tables パイプラインでパフォーマンスの低いクエリやボトルネックを特定するのに役立つクエリの詳細とクエリ プロファイルにアクセスできます。 クエリ履歴とクエリ プロファイルで使用できる情報の種類の概要については、「 Query 履歴 および Query プロファイルを参照してください。
重要
この機能はパブリック プレビュー段階にあります。 ワークスペース管理者は、プレビュー ページからこの機能を有効にすることができます。 「Azure Databricks プレビューを管理する」を参照してください。
ストリーミング テーブルに関連するすべてのステートメントがクエリ履歴に表示されます。 Statement ドロップダウン フィルターを使用して、任意のコマンドを選択し、関連するクエリを調べることができます。 すべての CREATE
ステートメントの後に、Delta Live Tables パイプラインで非同期的に実行される REFRESH
ステートメントが続きます。 REFRESH
ステートメントには、通常、パフォーマンスの最適化に関する分析情報を提供する詳細なクエリ プランが含まれます。
クエリ履歴 UI REFRESH
ステートメントにアクセスするには、次の手順に従います。
- 左側のサイドバーで をクリックして、 Query 履歴 UI を開きます。
- Statement ドロップダウン フィルターから REFRESH チェック ボックスをオンにします。
- クエリ ステートメントの名前をクリックすると、クエリの期間や集計されたメトリックなどの概要の詳細が表示されます。
- クエリ プロファイルを表示をクリックして、クエリ プロファイルを開きます。 クエリ プロファイルの移動の詳細については、 Query プロファイル を参照してください。
- 必要に応じて、 Query Source セクションのリンクを使用して、関連するクエリまたはパイプラインを開くことができます。
SQL エディターのリンクを使用するか、SQL ウェアハウスに接続されているノートブックから、クエリの詳細にアクセスすることもできます。
Note
ストリーミング テーブルは、 preview チャネルを使用して実行するように構成する必要があります。 「 ランタイム チャネルの設定」を参照してください。