共用方式為


在 Apache Airflow 作業中安裝私人套件作為需求

注意

Apache Airflow 作業是由 Apache Airflow 提供電源。

Python 套件是將相關 Python 模組組織成單一目錄階層的方法。 套件通常會以目錄表示,其中包含稱為 init.py 的特殊檔案。 在套件目錄內,您可以有多個可定義函數、類別和變數的 Python 模組檔案 (.py 檔案)。 在 Apache Airflow 作業的內容中,您可以開發私人套件,以新增自定義 Apache Airflow 操作員、勾點、感測器、外掛程式等。

在本教學課程中,您將建立簡單的自定義運算子作為 Python 套件、將它新增為 Apache Airflow 作業環境中的需求,並將私人套件匯入為 DAG 檔案內的模組。

使用 Apache Airflow DAG 開發自訂運算子並進行測試

  1. 建立檔案 sample_operator.py,並將它轉換成私人套件。 請參閱指南:在 Python 中建立套件

    from airflow.models.baseoperator import BaseOperator
    
    
    class SampleOperator(BaseOperator):
        def __init__(self, name: str, **kwargs) -> None:
            super().__init__(**kwargs)
            self.name = name
    
        def execute(self, context):
            message = f"Hello {self.name}"
            return message
    
    
  2. 建立 Apache Airflow DAG 檔案 sample_dag.py 以測試步驟 1 中所定義的運算子。

    from datetime import datetime
    from airflow import DAG
    
     # Import from private package
    from airflow_operator.sample_operator import SampleOperator
    
    
    with DAG(
    "test-custom-package",
    tags=["example"]
    description="A simple tutorial DAG",
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    ) as dag:
        task = SampleOperator(task_id="sample-task", name="foo_bar")
    
        task
    
  3. 建立在 Dags 資料夾中包含 sample_dag.py 的 GitHub 存放庫和您的私人套件檔案。 常見的檔案格式包括 zip.whltar.gz。 將檔案適當地放入 'Dags' 或 'Plugins' 資料夾。 同步處理 Git 存放庫與 Apache Airflow 作業,或使用預先設定的存放庫Install-Private-Package

將套件新增為需求

Airflow requirements 下新增套件為需求。 請使用 /opt/airflow/git/<repoName>.git/<pathToPrivatePackage> 格式

例如,如果您的私人套件位於 GitHub 存放庫中的 /dags/test/private.whl,則將需求 /opt/airflow/git/<repoName>.git/dags/test/private.whl 新增至 Airflow 環境。

螢幕擷取畫面,其中顯示已新增為需求之私人套件。

快速入門:建立 Apache Airflow 作業