次の方法で共有


チュートリアル: Spark SQL を使用した COPY INTO

Databricks では、数千のファイルを含むデータ ソースの増分データ読み込みと一括データ読み込みには、COPY INTO コマンドを使用することをお勧めします。 Databricks では、高度なユース ケース 自動ローダー を使用することをお勧めします。

このチュートリアルでは、COPY INTO コマンドを使用して、クラウド オブジェクト ストレージから Azure Databricks ワークスペース内のテーブルにデータを読み込みます。

必要条件

  1. Azure サブスクリプション、そのサブスクリプション内の Azure Databricks ワークスペース、およびそのワークスペース内のクラスター。 これらを作成するには、「クイック スタート: Azure portalを使用して Azure Databricks ワークスペースで Spark ジョブを実行する」を参照してください。 このクイック スタートに従う場合は、「Spark SQL ジョブ の実行」セクションの手順に従う必要はありません。
  2. ワークスペース内で Databricks Runtime 11.3 LTS 以降を実行している、汎用的な クラスター。 汎用クラスターを作成するには、「コンピューティング構成リファレンス参照してください。
  3. Azure Databricks ワークスペースのユーザー インターフェイスに関する知識。 「ワークスペースのを移動する」を参照してください。
  4. Databricks ノートブックの操作に関する知識。
  5. データを書き込むことができる場所。このデモでは DBFS ルートを例として使用していますが、Databricks では Unity カタログで構成された外部ストレージの場所をお勧めします。

手順 1. 環境を構成し、データ ジェネレーターを作成する

このチュートリアルでは、Azure Databricks と既定のワークスペース構成に関する基本的な知識を前提としています。 指定されたコードを実行できない場合は、ワークスペース管理者に連絡して、コンピューティング リソースへのアクセス権と、データを書き込むことができる場所を確認してください。

指定されたコードでは、source パラメーターを使用して、COPY INTO データ ソースとして構成する場所を指定します。 記述されているように、このコードは DBFS ルート上の場所を指します。 外部オブジェクトストレージの場所に対する書き込み権限がある場合は、ソース文字列の dbfs:/ 部分をオブジェクトストレージへのパスに置き換えます。 このコード ブロックでは、このデモをリセットするための再帰的な削除も行われるため、運用環境のデータでこれを指さないことを確認し、既存のデータを上書きまたは削除しないように、/user/{username}/copy-into-demo 入れ子になったディレクトリを保持してください。

  1. 新しい SQL ノートブック を作成し、それを Databricks Runtime 11.3 LTS 以降を実行するクラスター に接続 します。

  2. 次のコードをコピーして実行し、このチュートリアルで使用するストレージの場所とデータベースをリセットします。

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. 次のコードをコピーして実行して、データをランダムに生成するために使用されるテーブルと関数をいくつか構成します。

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

手順 2: サンプル データをクラウド ストレージに書き込む

Delta Lake 以外のデータ形式への書き込みは、Azure Databricks ではまれです。 ここで示すコードは JSON に書き込み、別のシステムからオブジェクト ストレージに結果をダンプする可能性のある外部システムをシミュレートします。

  1. 次のコードをコピーして実行して、生の JSON データのバッチを書き込みます。

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

手順 3: COPY INTO を使用して JSON データをべき等に読み込む

COPY INTOを使用する前に、ターゲット Delta Lake テーブルを作成する必要があります。 Databricks Runtime 11.3 LTS 以降では、CREATE TABLE ステートメントにテーブル名以外のものを指定する必要はありません。 以前のバージョンの Databricks Runtime では、空のテーブルを作成するときにスキーマを指定する必要があります。

  1. 次のコードをコピーして実行し、ターゲットの Delta テーブルを作成し、ソースからデータを読み込みます。

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

このアクションはべき等であるため、複数回実行できますが、データは 1 回だけ読み込まれます。

手順 4: テーブルの内容をプレビューする

簡単な SQL クエリを実行して、このテーブルの内容を手動で確認できます。

  1. 次のコードをコピーして実行して、テーブルをプレビューします。

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

手順 5: より多くのデータを読み込み、結果をプレビューする

手順 2 から 4 を何度も実行し直して、ランダムな生 JSON データの新しいバッチをソースに配置し、COPY INTO を使ってそれを Delta Lake にべき等に読み込んで、結果をプレビューできます。 これらの手順を順不同でまたは複数回実行して、生データの複数のバッチが書き込まれる様子をシミュレートしたり、新しいデータが到着しない状況で COPY INTO を複数回実行してみてください。

手順 6: チュートリアルをクリーンアップする

このチュートリアルが完了したら、関連付けられているリソースを保持する必要がなくなったら、クリーンアップできます。

  1. 次のコードをコピーして実行して、データベース、テーブルを削除し、すべてのデータを削除します。

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. コンピューティング リソースを停止するには、[クラスター] タブに移動し、クラスター 終了 します。

その他のリソース