Azure Data Factory 워크플로 오케스트레이션 매니저를 사용하는 Apache Flink® 작업 오케스트레이션(Apache Airflow에서 구동)
참고 항목
2025년 1월 31일에 Azure HDInsight on AKS가 사용 중지됩니다. 2025년 1월 31일 이전에 워크로드가 갑자기 종료되지 않도록 워크로드를 Microsoft Fabric 또는 동등한 Azure 제품으로 마이그레이션해야 합니다. 구독의 나머지 클러스터는 호스트에서 중지되고 제거됩니다.
사용 중지 날짜까지 기본 지원만 사용할 수 있습니다.
Important
이 기능은 현지 미리 보기로 제공됩니다. Microsoft Azure 미리 보기에 대한 보충 사용 약관에는 베타 또는 미리 보기로 제공되거나 아직 일반 공급으로 릴리스되지 않은 Azure 기능에 적용되는 더 많은 약관이 포함되어 있습니다. 이 특정 미리 보기에 대한 자세한 내용은 Azure HDInsight on AKS 미리 보기 정보를 참조하세요. 질문이나 기능 제안이 있는 경우 AskHDInsight에서 세부 정보와 함께 요청을 제출하고 Azure HDInsight 커뮤니티에서 더 많은 업데이트를 확인하세요.
이 문서에서는 Azure REST API 및 Azure Data Factory 워크플로 오케스트레이션 매니저를 통한 오케스트레이션 데이터 파이프라인을 사용하여 Flink 작업을 관리하는 방법을 다룹니다. Azure Data Factory 워크플로 오케스트레이션 매니저 서비스는 Apache Airflow 환경을 만들고 관리하는 간단하고 효율적인 방법으로, 대규모 데이터 파이프라인을 쉽게 실행할 수 있도록 해줍니다.
Apache Airflow는 복잡한 데이터 워크플로를 프로그래밍 방식으로 만들기, 예약 및 모니터링하는 오픈 소스 플랫폼입니다. 이를 통해 데이터 파이프라인을 나타내기 위해 DAG(방향성 비순환 그래프)로 결합할 수 있는 연산자라고 하는 일련의 작업을 정의할 수 있습니다.
다음 다이어그램은 Azure의Airflow, Key Vault 및 HDInsight on AKS의 배치를 보여 줍니다.
필요한 액세스를 제한하고 클라이언트 자격 증명 수명 주기를 독립적으로 관리하기 위해 범위에 따라 여러 Azure 서비스 주체가 만들어집니다.
액세스 키나 비밀을 주기적으로 회전하는 것이 좋습니다.
설정 단계
Flink 작업 jar을 스토리지 계정에 업로드합니다. Flink 클러스터와 연결된 기본 스토리지 계정이거나 이 스토리지 계정의 클러스터에 사용되는 사용자 할당 MSI에 "Storage Blob 데이터 소유자" 역할을 할당해야 하는 다른 스토리지 계정일 수 있습니다.
Azure Key Vault - Azure Key Vault가 없는 경우 이 자습서에 따라 새 Azure Key Vault를 만들 수 있습니다.
Key Vault에 액세스하기 위한 Microsoft Entra 서비스 주체 만들기 - "Key Vault Secrets Officer" 역할로 Azure Key Vault에 액세스할 수 있는 권한을 부여하고 응답에서 ‘appId’ ‘암호’ 및 ‘테넌트’를 기록해 둡니다. 중요한 정보를 저장하기 위한 백 엔드로 Key Vault 스토리지를 사용하려면 Airflow에도 동일한 기능을 사용해야 합니다.
az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID>
워크플로 오케스트레이션 매니저용 Azure Key Vault를 사용하도록 설정하면 중요한 정보를 안전하고 중앙 집중화된 방식으로 저장하고 관리할 수 있습니다. 이렇게 하면 변수와 연결을 사용할 수 있으며 자동으로 Azure Key Vault에 저장됩니다. 연결 및 변수의 이름에는 AIRFLOW__SECRETS__BACKEND_KWARGS에 정의된 Variable_prefix가 앞에 와야 합니다. 예를 들어, Variable_prefix의 값이 hdinsight-aks-variables인 경우 변수 키 hello의 경우 변수를 hdinsight-aks-variable -hello에 저장할 수 있습니다.
통합 런타임 속성에서 Airflow 구성 재정의에 대해 다음 설정을 추가합니다.
AIRFLOW__SECRETS__BACKEND:
"airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"
AIRFLOW__SECRETS__BACKEND_KWARGS:
"{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”
Airflow 통합 런타임 속성에서 환경 변수 구성에 대해 다음 설정을 추가합니다.
AZURE_CLIENT_ID =
<App Id from Create Azure AD Service Principal>
AZURE_TENANT_ID =
<Tenant from Create Azure AD Service Principal>
AZURE_CLIENT_SECRET =
<Password from Create Azure AD Service Principal>
Airflow 요구 사항 추가 - apache-airflow-providers-microsoft-azure
Azure에 액세스하기 위한 Microsoft Entra 서비스 주체 만들기 – 기여자 역할로 HDInsight AKS 클러스터에 액세스할 수 있는 권한을 부여하고 응답에서 appId, 암호 및 테넌트를 기록해 둡니다.
az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>
AIRFLOW__SECRETS__BACKEND_KWARGS에 정의된 Variable_prefix 속성이 앞에 붙은 이전 AD 서비스 주체 appId, 비밀 및 테넌트의 값을 사용하여 키 자격 증명 모음에 다음 비밀을 만듭니다. DAG 코드는 Variable_prefix 없이 이러한 변수에 액세스할 수 있습니다.
hdinsight-aks-variables-api-client-id=
<App ID from previous step>
hdinsight-aks-variables-api-secret=
<Password from previous step>
hdinsight-aks-variables-tenant-id=
<Tenant from previous step>
from airflow.models import Variable def retrieve_variable_from_akv(): variable_value = Variable.get("client-id") print(variable_value)
DAG 정의
DAG(방향성 비순환 Graph)는 Airflow의 핵심 개념으로, 작업을 함께 수집하고 종속성과 관계로 구성하여 실행 방법을 알려 줍니다.
DAG를 선언하는 방법에는 세 가지가 있습니다.
암시적으로 DAG 내부의 모든 항목에 DAG를 추가하는 컨텍스트 관리자를 사용할 수 있습니다.
표준 생성자를 사용하여 DAG를 사용하는 연산자에 전달할 수 있습니다.
@dag 데코레이터를 사용하여 함수를 DAG 생성기로 전환할 수 있습니다(airflow.decorators import dag에서).
실행할 작업이 없으면 DAG는 아무 것도 아니며 연산자, 센서 또는 작업 흐름의 형태로 제공됩니다.
DAG, 제어 흐름, SubDAG, TaskGroup 등에 대한 자세한 내용은 Apache Airflow에서 직접 읽을 수 있습니다.
DAG 실행
코드 예는 git에서 확인할 수 있습니다. 코드를 컴퓨터에 로컬로 다운로드하고 wordcount.py를 Blob Storage에 업로드합니다. 단계에 따라 설정 중에 만들어진 워크플로로 DAG를 가져오세요.
wordcount.py는 HDInsight on AKS와 함께 Apache Airflow를 사용하여 Flink 작업 제출을 오케스트레이션하는 예입니다. DAG에는 두 가지 작업이 있습니다.
get
OAuth Token
HDInsight Flink 작업 제출 Azure REST API를 호출하여 새 작업을 제출합니다.
DAG는 OAuth 클라이언트 자격 증명 설정 프로세스 중에 설명된 대로 서비스 주체에 대한 설정을 갖고 실행을 위해 다음 입력 구성을 전달할 것으로 예상합니다.
실행 단계
Airflow UI에서 DAG를 실행하면 모니터 아이콘을 클릭하여 Azure Data Factory 워크플로 오케스트레이션 매니저 UI를 열 수 있습니다.
"DAG" 페이지에서 "FlinkWordCountExample" DAG를 선택합니다.
오른쪽 상단에서 "실행" 아이콘을 클릭하고 "구성으로 DAG 트리거"를 선택합니다.
필수 구성 JSON 전달
{ "jarName":"WordCount.jar", "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", "subscription":"<cluster subscription id>", "rg":"<cluster resource group>", "poolNm":"<cluster pool name>", "clusterNm":"<cluster name>" }
"트리거" 단추를 클릭하면 DAG 실행이 시작됩니다.
DAG 실행에서 DAG 작업 상태를 시각화할 수 있습니다.
포털에서 작업 실행 유효성 검사
"Apache Flink 대시보드"에서 작업 유효성 검사
예제 코드
이는 HDInsight on AKS와 함께 Airflow를 사용하여 데이터 파이프라인을 오케스트레이션하는 예입니다.
DAG는 OAuth 클라이언트 자격 증명에 대한 서비스 주체를 설정하고 실행을 위해 다음 입력 구성을 전달할 것으로 예상합니다.
{
'jarName':'WordCount.jar',
'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net',
'subscription':'<cluster subscription id>',
'rg':'<cluster resource group>',
'poolNm':'<cluster pool name>',
'clusterNm':'<cluster name>'
}
참조
- 샘플 코드를 참조하세요.
- Apache Flink 웹 사이트
- Apache, Apache Airflow, Airflow, Apache Flink, Flink 및 관련 오픈 소스 프로젝트 이름은 ASF(Apache Software Foundation)의 상표입니다.