在 Apache Airflow 作業中安裝私人套件作為需求
注意
Apache Airflow 作業是由 Apache Airflow 提供電源。
Python 套件是將相關 Python 模組組織成單一目錄階層的方法。 套件通常會以目錄表示,其中包含稱為 init.py 的特殊檔案。 在套件目錄內,您可以有多個可定義函數、類別和變數的 Python 模組檔案 (.py 檔案)。 在 Apache Airflow 作業的內容中,您可以開發私人套件,以新增自定義 Apache Airflow 操作員、勾點、感測器、外掛程式等。
在本教學課程中,您將建立簡單的自定義運算子作為 Python 套件、將它新增為 Apache Airflow 作業環境中的需求,並將私人套件匯入為 DAG 檔案內的模組。
使用 Apache Airflow DAG 開發自訂運算子並進行測試
建立檔案
sample_operator.py
,並將它轉換成私人套件。 請參閱指南:在 Python 中建立套件from airflow.models.baseoperator import BaseOperator class SampleOperator(BaseOperator): def __init__(self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = f"Hello {self.name}" return message
建立 Apache Airflow DAG 檔案
sample_dag.py
以測試步驟 1 中所定義的運算子。from datetime import datetime from airflow import DAG # Import from private package from airflow_operator.sample_operator import SampleOperator with DAG( "test-custom-package", tags=["example"] description="A simple tutorial DAG", schedule_interval=None, start_date=datetime(2021, 1, 1), ) as dag: task = SampleOperator(task_id="sample-task", name="foo_bar") task
建立在
Dags
資料夾中包含sample_dag.py
的 GitHub 存放庫和您的私人套件檔案。 常見的檔案格式包括zip
、.whl
或tar.gz
。 將檔案適當地放入 'Dags' 或 'Plugins' 資料夾。 同步處理 Git 存放庫與 Apache Airflow 作業,或使用預先設定的存放庫Install-Private-Package
將套件新增為需求
在 Airflow requirements
下新增套件為需求。 請使用 /opt/airflow/git/<repoName>.git/<pathToPrivatePackage>
格式
例如,如果您的私人套件位於 GitHub 存放庫中的 /dags/test/private.whl
,則將需求 /opt/airflow/git/<repoName>.git/dags/test/private.whl
新增至 Airflow 環境。