次の方法で共有


Git フォルダーまたはワークスペース ファイルから Python モジュールをインポートする

Python コードを Databricks Git フォルダーまたはワークスペース ファイルに格納し、その Python コードを Delta Live Tables パイプラインにインポートできます。 Git フォルダーまたはワークスペース ファイルでモジュールを使用する方法の詳細については、「Python と R のモジュールを使用する」を参照してください。

Note

Databricks Git フォルダーまたはワークスペース ファイルに保存されているノートブックからソース コードをインポートすることはできません。 代わりに、パイプラインを作成または編集するときにノートブックを直接追加します。 「 デルタ ライブ テーブル パイプラインを構成するを参照してください。

Python モジュールを Delta Live Tables パイプラインにインポートする

次の例では、ワークスペース ファイルから Python モジュールとしてデータセット クエリをインポートする方法を示します。 この例では、ワークスペース ファイルを使用してパイプラインのソース コードを格納する方法について説明しますが、これを Git フォルダーに格納するソース コードで使用できます。

このサンプルを実行するには、次の手順を使用します。

  1. Azure Databricks ワークスペースのサイドバーで ワークスペース アイコン [ワークスペース] をクリックして、ワークスペース ブラウザーを開きます。

  2. ワークスペース ブラウザーを使用して、Python モジュールのディレクトリを選択します。

  3. 選択したディレクトリの右端の列で Kebab メニュー をクリックし、[ファイル]>[作成] をクリックします。

  4. ファイルの名前を入力します (例: clickstream_raw_module.py)。 ファイル エディターが開きます。 ソース データをテーブルに読み込むモジュールを作成するには、エディター ウィンドウで次のように入力します。

    from dlt import *
    
    json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
    
    def create_clickstream_raw_table(spark):
      @table
      def clickstream_raw():
        return (
          spark.read.json(json_path)
        )
    
  5. 準備されたデータを含む新しいテーブルを作成するモジュールを作成するには、同じディレクトリに新しいファイルを作成し、ファイルの名前 (例: clickstream_prepared_module.py) を入力します。新しいエディター ウィンドウに次のように入力します。

    from clickstream_raw_module import *
    from dlt import read
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    def create_clickstream_prepared_table(spark):
      create_clickstream_raw_table(spark)
      @table
      @expect("valid_current_page_title", "current_page_title IS NOT NULL")
      @expect_or_fail("valid_count", "click_count > 0")
      def clickstream_prepared():
        return (
          read("clickstream_raw")
            .withColumn("click_count", expr("CAST(n AS INT)"))
            .withColumnRenamed("curr_title", "current_page_title")
            .withColumnRenamed("prev_title", "previous_page_title")
            .select("current_page_title", "click_count", "previous_page_title")
        )
    
  6. 次に、パイプライン ノートブックを作成します。 Azure Databricks のランディング ページにアクセスし、[ノートブックの作成] を選択するか、サイドバーで 新規アイコン [新規] をクリックして、メニューから [ノートブック] を選択します。 Kebab メニュー をクリックし、[作成]>[ノートブック] をクリックして、ワークスペース ブラウザーでノートブックを作成することもできます。

  7. ノートブックに名前を付け、Python が既定の言語であることを確認します。

  8. Create をクリックしてください。

  9. ノートブックにコード例を入力します。

    Note

    ノートブック ディレクトリとは異なるワークスペース ファイル パスまたは Git フォルダー パスからノートブックでモジュールやパッケージをインポートする場合は、sys.path.append() を使用してファイルへのパスを手動で追加する必要があります。

    Git フォルダーからファイルをインポートする場合は、パスの前に /Workspace/ を追加する必要があります。 たとえば、sys.path.append('/Workspace/...') のようにします。 パスから /Workspace/ を省略すると、エラーが発生します。

    モジュールまたはパッケージがノートブックと同じディレクトリに格納されている場合は、パスを手動で追加する必要はありません。 また、ルート ディレクトリはパスに自動的に追加されるため、Git フォルダーのルート ディレクトリからインポートするときに、パスを手動で追加する必要はありません。

    import sys, os
    # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook.
    sys.path.append(os.path.abspath('<module-path>'))
    
    import dlt
    from clickstream_prepared_module import *
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    create_clickstream_prepared_table(spark)
    
    @dlt.table(
      comment="A table containing the top pages linking to the Apache Spark page."
    )
    def top_spark_referrers():
      return (
        spark.read.table("LIVE.clickstream_prepared")
          .filter(expr("current_page_title == 'Apache_Spark'"))
          .withColumnRenamed("previous_page_title", "referrer")
          .sort(desc("click_count"))
          .select("referrer", "click_count")
          .limit(10)
      )
    

    <module-path> を、インポートする Python モジュールを含むディレクトリへのパスに置き換えます。

  10. 新しいノートブックを使用してパイプラインを作成します。

  11. パイプラインを実行するには、[パイプラインの詳細] ページで [開始] をクリックします。

Python コードをパッケージとしてインポートすることもできます。 Delta Live Tables ノートブックの次のコード スニペットでは、ノートブックと同じディレクトリ内の test_utils ディレクトリから dlt_packages パッケージがインポートされます。 dlt_packages ディレクトリには test_utils.py ファイルと __init__.py ファイルが含まれ、test_utils.py では create_test_table() 関数が定義されます。

import dlt

@dlt.table
def my_table():
  return spark.read.table(...)

# ...

import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)