チュートリアル: Spark SQL を使用した COPY INTO
Databricks では、数千のファイルを含むデータ ソースの増分データ読み込みと一括データ読み込みには、COPY INTO コマンドを使用することをお勧めします。 Databricks では、高度なユース ケース 自動ローダー を使用することをお勧めします。
このチュートリアルでは、COPY INTO
コマンドを使用して、クラウド オブジェクト ストレージから Azure Databricks ワークスペース内のテーブルにデータを読み込みます。
必要条件
- Azure サブスクリプション、そのサブスクリプション内の Azure Databricks ワークスペース、およびそのワークスペース内のクラスター。 これらを作成するには、「クイック スタート: Azure portalを使用して Azure Databricks ワークスペースで Spark ジョブを実行する」を参照してください。 このクイック スタートに従う場合は、「Spark SQL ジョブ の実行」セクションの手順に従う必要はありません。
- ワークスペース内で Databricks Runtime 11.3 LTS 以降を実行している、汎用的な クラスター。 汎用クラスターを作成するには、「コンピューティング構成リファレンス
参照してください。 - Azure Databricks ワークスペースのユーザー インターフェイスに関する知識。 「ワークスペースのを移動する」を参照してください。
- Databricks ノートブックの操作に関する知識。
- データを書き込むことができる場所。このデモでは DBFS ルートを例として使用していますが、Databricks では Unity カタログで構成された外部ストレージの場所をお勧めします。
手順 1. 環境を構成し、データ ジェネレーターを作成する
このチュートリアルでは、Azure Databricks と既定のワークスペース構成に関する基本的な知識を前提としています。 指定されたコードを実行できない場合は、ワークスペース管理者に連絡して、コンピューティング リソースへのアクセス権と、データを書き込むことができる場所を確認してください。
指定されたコードでは、source
パラメーターを使用して、COPY INTO
データ ソースとして構成する場所を指定します。 記述されているように、このコードは DBFS ルート上の場所を指します。 外部オブジェクトストレージの場所に対する書き込み権限がある場合は、ソース文字列の dbfs:/
部分をオブジェクトストレージへのパスに置き換えます。 このコード ブロックでは、このデモをリセットするための再帰的な削除も行われるため、運用環境のデータでこれを指さないことを確認し、既存のデータを上書きまたは削除しないように、/user/{username}/copy-into-demo
入れ子になったディレクトリを保持してください。
新しい SQL ノートブック を作成し、それを Databricks Runtime 11.3 LTS 以降を実行するクラスター に接続します。 次のコードをコピーして実行し、このチュートリアルで使用するストレージの場所とデータベースをリセットします。
%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)
次のコードをコピーして実行して、データをランダムに生成するために使用されるテーブルと関数をいくつか構成します。
-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
手順 2: サンプル データをクラウド ストレージに書き込む
Delta Lake 以外のデータ形式への書き込みは、Azure Databricks ではまれです。 ここで示すコードは JSON に書き込み、別のシステムからオブジェクト ストレージに結果をダンプする可能性のある外部システムをシミュレートします。
次のコードをコピーして実行して、生の JSON データのバッチを書き込みます。
-- Write a new batch of data to the data source INSERT INTO user_ping_raw SELECT *, get_ping() ping, current_timestamp() time FROM user_ids WHERE is_active()=true;
手順 3: COPY INTO を使用して JSON データをべき等に読み込む
COPY INTO
を使用する前に、ターゲット Delta Lake テーブルを作成する必要があります。 Databricks Runtime 11.3 LTS 以降では、CREATE TABLE
ステートメントにテーブル名以外のものを指定する必要はありません。 以前のバージョンの Databricks Runtime では、空のテーブルを作成するときにスキーマを指定する必要があります。
次のコードをコピーして実行し、ターゲットの Delta テーブルを作成し、ソースからデータを読み込みます。
-- Create target table and load data CREATE TABLE IF NOT EXISTS user_ping_target; COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ("mergeSchema" = "true") COPY_OPTIONS ("mergeSchema" = "true")
このアクションはべき等であるため、複数回実行できますが、データは 1 回だけ読み込まれます。
手順 4: テーブルの内容をプレビューする
簡単な SQL クエリを実行して、このテーブルの内容を手動で確認できます。
次のコードをコピーして実行して、テーブルをプレビューします。
-- Review updated table SELECT * FROM user_ping_target
手順 5: より多くのデータを読み込み、結果をプレビューする
手順 2 から 4 を何度も実行し直して、ランダムな生 JSON データの新しいバッチをソースに配置し、COPY INTO
を使ってそれを Delta Lake にべき等に読み込んで、結果をプレビューできます。 これらの手順を順不同でまたは複数回実行して、生データの複数のバッチが書き込まれる様子をシミュレートしたり、新しいデータが到着しない状況で COPY INTO
を複数回実行してみてください。
手順 6: チュートリアルをクリーンアップする
このチュートリアルが完了したら、関連付けられているリソースを保持する必要がなくなったら、クリーンアップできます。
次のコードをコピーして実行して、データベース、テーブルを削除し、すべてのデータを削除します。
%python # Drop database and tables and remove data spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") dbutils.fs.rm(source, True)
コンピューティング リソースを停止するには、[
クラスター ] タブに移動し、クラスター 終了します。
その他のリソース
- COPY INTO リファレンス記事