在 Apache Airflow 作业中按要求安装专用包
注意
Apache Airflow 作业由 Apache Airflow 提供支持。
Python 包是一种将相关 Python 模块组织到单个目录层次结构中的方法。 包通常表示为包含名为 init.py 的特殊文件的目录。 在包目录中,可以有多个用于定义函数、类和变量的 Python 模块文件(.py 文件)。 在 Apache Airflow 作业的上下文中,可以开发专用包来添加自定义 Apache Airflow 运算符、挂钩、传感器、插件等。
在本教程中,你将创建一个简单的自定义运算符作为 Python 包,按要求将其添加到 Apache Airflow 作业环境中,并导入专用包作为 DAG 文件中的模块。
开发自定义运算符并使用 Apache Airflow Dag 进行测试
创建文件
sample_operator.py
并将其转换为专用包。 请参阅指南:在 python 中创建包from airflow.models.baseoperator import BaseOperator class SampleOperator(BaseOperator): def __init__(self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = f"Hello {self.name}" return message
创建 Apache Airflow DAG 文件
sample_dag.py
以测试步骤 1 中定义的操作符。from datetime import datetime from airflow import DAG # Import from private package from airflow_operator.sample_operator import SampleOperator with DAG( "test-custom-package", tags=["example"] description="A simple tutorial DAG", schedule_interval=None, start_date=datetime(2021, 1, 1), ) as dag: task = SampleOperator(task_id="sample-task", name="foo_bar") task
创建一个 GitHub 仓库,其中包含
Dags
文件夹中的sample_dag.py
和你的专用包文件。 常见的文件格式包括zip
、.whl
或tar.gz
。 根据需要将文件放在“Dags”或“Plugins”文件夹中。 将 Git 存储库与 Apache Airflow 作业同步,也可以使用预配置的存储库 Install-Private-Package
将专用包作为要求添加
在 Airflow requirements
下将包添加为要求。 使用格式 /opt/airflow/git/<repoName>.git/<pathToPrivatePackage>
例如,如果专用包位于 GitHub 仓库的 /dags/test/private.whl
,请将需求 /opt/airflow/git/<repoName>.git/dags/test/private.whl
添加到 Airflow 环境中。