在 Apache Airflow 作业中按要求安装专用包

注意

Apache Airflow 作业由 Apache Airflow 提供支持。

Python 包是一种将相关 Python 模块组织到单个目录层次结构中的方法。 包通常表示为包含名为 init.py 的特殊文件的目录。 在包目录中,可以有多个用于定义函数、类和变量的 Python 模块文件(.py 文件)。 在 Apache Airflow 作业的上下文中,可以开发专用包来添加自定义 Apache Airflow 运算符、挂钩、传感器、插件等。

在本教程中,你将创建一个简单的自定义运算符作为 Python 包,按要求将其添加到 Apache Airflow 作业环境中,并导入专用包作为 DAG 文件中的模块。

开发自定义运算符并使用 Apache Airflow Dag 进行测试

  1. 创建文件 sample_operator.py 并将其转换为专用包。 请参阅指南:在 python 中创建包

    from airflow.models.baseoperator import BaseOperator
    
    
    class SampleOperator(BaseOperator):
        def __init__(self, name: str, **kwargs) -> None:
            super().__init__(**kwargs)
            self.name = name
    
        def execute(self, context):
            message = f"Hello {self.name}"
            return message
    
    
  2. 创建 Apache Airflow DAG 文件 sample_dag.py 以测试步骤 1 中定义的操作符。

    from datetime import datetime
    from airflow import DAG
    
     # Import from private package
    from airflow_operator.sample_operator import SampleOperator
    
    
    with DAG(
    "test-custom-package",
    tags=["example"]
    description="A simple tutorial DAG",
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    ) as dag:
        task = SampleOperator(task_id="sample-task", name="foo_bar")
    
        task
    
  3. 创建一个 GitHub 仓库,其中包含 Dags 文件夹中的 sample_dag.py 和你的专用包文件。 常见的文件格式包括 zip.whltar.gz。 根据需要将文件放在“Dags”或“Plugins”文件夹中。 将 Git 存储库与 Apache Airflow 作业同步,也可以使用预配置的存储库 Install-Private-Package

将专用包作为要求添加

Airflow requirements 下将包添加为要求。 使用格式 /opt/airflow/git/<repoName>.git/<pathToPrivatePackage>

例如,如果专用包位于 GitHub 仓库的 /dags/test/private.whl,请将需求 /opt/airflow/git/<repoName>.git/dags/test/private.whl 添加到 Airflow 环境中。

该屏幕截图显示了按要求添加的专用包。

快速入门:创建 Apache Airflow 作业