Apache Airflow ジョブの要件としてプライベート パッケージをインストールする
Note
Apache Airflow ジョブは、Apache Airflow を使用しています。
Python パッケージは、関連する Python モジュールを単一のディレクトリ階層に整理する方法です。 パッケージは通常、init.py という特殊なファイルを含むディレクトリとして表されます。 パッケージ ディレクトリ内には、関数、クラス、変数を定義した複数の Python モジュール ファイル (.py ファイル) を含めることができます。 Apache Airflow ジョブのコンテキストでは、カスタムの Apache Airflow 演算子、フック、センサー、プラグインなどを追加するプライベート パッケージを開発できます。
このチュートリアルでは、シンプルなカスタム演算子を Python パッケージとして作成し、それを Apache Airflow ジョブ環境に要件として追加し、プライベート パッケージを DAG ファイル内のモジュールとしてインポートします。
Apache Airflow Dag を使用してカスタム演算子を開発し、テストする
ファイル
sample_operator.py
を作成し、プライベート パッケージに変換します。 ガイド:「Python でのパッケージの作成」を参照してくださいfrom airflow.models.baseoperator import BaseOperator class SampleOperator(BaseOperator): def __init__(self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = f"Hello {self.name}" return message
手順 1 で定義された演算子をテストするApache Airflow DAG ファイル
sample_dag.py
を作成します。from datetime import datetime from airflow import DAG # Import from private package from airflow_operator.sample_operator import SampleOperator with DAG( "test-custom-package", tags=["example"] description="A simple tutorial DAG", schedule_interval=None, start_date=datetime(2021, 1, 1), ) as dag: task = SampleOperator(task_id="sample-task", name="foo_bar") task
Dags
フォルダーとプライベート パッケージ ファイル内にsample_dag.py
を含む GitHub リポジトリを作成します。 一般的なファイル形式には、zip
、.whl
、tar.gz
があります。 適切な "Dags" フォルダーまたは "Plugins" フォルダー内にファイルを配置します。 Git リポジトリを Apache Airflow ジョブと同期します。構成済みのリポジトリ Install-Private-Package を使用することもできます
パッケージを要件として追加する
Airflow requirements
でパッケージを要件として追加します。 /opt/airflow/git/<repoName>.git/<pathToPrivatePackage>
という形式を使用します
たとえば、プライベート パッケージが GitHub リポジトリの /dags/test/private.whl
にある場合は、Airflow 環境に要件 /opt/airflow/git/<repoName>.git/dags/test/private.whl
を追加します。