Azure Data Factory 워크플로 오케스트레이션 관리자를 사용하여 Apache Flink® 작업 오케스트레이션(Apache Airflow에서 구동)
중요하다
AKS의 Azure HDInsight는 2025년 1월 31일에 사용 중지되었습니다. 에 대해이 공지를 통해 자세히 알아보세요.
워크로드가 갑자기 종료되는 것을 방지하기 위해 워크로드를 Microsoft Fabric 또는 동등한 Azure 제품으로 워크로드를 마이그레이션해야 합니다.
중요하다
이 기능은 현재 미리 보기로 제공됩니다. Microsoft Azure 미리 보기의 추가 사용 약관은 베타 버전, 미리 보기 또는 아직 정식으로 출시되지 않은 Azure 기능에 적용되는 추가적인 법적 조건을 포함합니다. 이 특정 미리 보기에 대한 자세한 내용은 Azure HDInsight AKS 미리 보기 정보을 참조하세요. 질문이나 기능 제안을 하시려면 AskHDInsight 에 요청을 제출해 주시고, 더 많은 업데이트를 원하시면 Azure HDInsight Community를 팔로우해 주세요.
이 문서에서는 Azure Data Factory 워크플로 오케스트레이션 관리자를 사용하여 Azure REST API 및 오케스트레이션 데이터 파이프라인을 사용하여 Flink 작업을 관리하는 방법에 대해 설명합니다. Azure Data Factory 워크플로 오케스트레이션 관리자 서비스는 Apache Airflow 환경을 만들고 관리하여 데이터 파이프라인을 대규모로 쉽게 실행할 수 있는 간단하고 효율적인 방법입니다.
Apache Airflow는 복잡한 데이터 워크플로를 프로그래밍 방식으로 만들고, 예약하고, 모니터링하는 오픈 소스 플랫폼입니다. 이를 통해 데이터 파이프라인을 나타내기 위해 지시된 DAG(순환 그래프)로 결합할 수 있는 연산자라는 작업 집합을 정의할 수 있습니다.
다음 다이어그램에서는 Azure의 AKS에 Airflow, Key Vault 및 HDInsight를 배치하는 방법을 보여 줍니다.
필요한 액세스를 제한하고 클라이언트 자격 증명 수명 주기를 독립적으로 관리하기 위해 범위에 따라 여러 Azure 서비스 주체가 만들어집니다.
액세스 키 또는 비밀을 주기적으로 회전하는 것이 좋습니다.
설정 단계
Flink 작업 Jar 파일을 스토리지 계정에 업로드합니다. Flink 클러스터 또는 다른 스토리지 계정과 연결된 기본 스토리지 계정일 수 있습니다. 여기서 이 스토리지 계정의 클러스터에 사용되는 사용자 할당 MSI에 "Storage Blob 데이터 소유자" 역할을 할당해야 합니다.
Azure Key Vault - 이 자습서에 따라 새 Azure Key Vault 만들 수 있습니다(없는 경우).
Microsoft Entra 서비스 주체을 만들어 Key Vault에 액세스하십시오 – "Key Vault 비밀 책임자" 역할을 통해 Azure Key Vault에 접근 권한을 부여하고, 응답에서 'appId', 'password', 및 'tenant'를 기록하십시오. 중요한 정보를 저장하기 위해 Key Vault 스토리지를 백 엔드로 사용하려면 Airflow에 동일한 기능을 사용해야 합니다.
az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID>
워크플로 오케스트레이션 관리자 Azure Key Vault를 사용하여 중요한 정보를 안전하고 중앙 집중식으로 저장하고 관리할 수 있습니다. 이렇게 하면 변수 및 연결을 사용할 수 있으며 Azure Key Vault에 자동으로 저장됩니다. 연결 및 변수의 이름은 AIRFLOW__SECRETS__BACKEND_KWARGS 정의된 variables_prefix 접두사로 지정해야 합니다. 예를 들어, variables_prefix에 hdinsight-aks-variables라는 값이 있는 경우, hello의 변수 키에 대해 hdinsight-aks-variable-hello에 변수를 저장하려고 할 것입니다.
통합 런타임 속성에서 Airflow 구성 재정의에 대해 다음 설정을 추가합니다.
AIRFLOW__SECRETS__BACKEND:
"airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"
AIRFLOW__SECRETS__BACKEND_KWARGS:
"{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”
Airflow 통합 런타임 속성에서 환경 변수 구성에 대해 다음 설정을 추가합니다.
AZURE_CLIENT_ID =
<App Id from Create Azure AD Service Principal>
AZURE_TENANT_ID =
<Tenant from Create Azure AD Service Principal>
AZURE_CLIENT_SECRET =
<Password from Create Azure AD Service Principal>
에어플로우 요구 사항 추가 - apache-airflow-providers-microsoft-azure
Azure에 액세스하는 Microsoft Entra 서비스 주체 만들기 - 기여자 역할을 사용하여 HDInsight AKS 클러스터에 액세스할 수 있는 권한을 부여하고 응답에서 appId, 암호 및 테넌트를 기록해 둡니다.
az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>
AIRFLOW__SECRETS__BACKEND_KWARGS 내에 정의된 속성 variables_prefix로 접두사를 붙인 이전 AD 서비스 주체의 appId, 암호 및 테넌트 값을 사용하여 키 자격증명 모음에 다음 비밀들을 만드세요. DAG 코드는 variables_prefix 없이 이러한 변수에 액세스할 수 있습니다.
hdinsight-aks-variables-api-client-id=
<App ID from previous step>
hdinsight-aks-variables-api-secret=
<Password from previous step>
hdinsight-aks-variables-tenant-id=
<Tenant from previous step>
from airflow.models import Variable def retrieve_variable_from_akv(): variable_value = Variable.get("client-id") print(variable_value)
DAG 정의
DAG(Directed Acyclic Graph)는 Airflow의 핵심 개념으로, 작업을 함께 수집하고 종속성 및 관계로 구성하여 실행 방법을 설명합니다.
DAG를 선언하는 세 가지 방법이 있습니다.
컨텍스트 관리자를 사용하여 DAG를 암시적으로 내부 모든 항목에 추가할 수 있습니다.
표준 생성자를 사용하여 DAG를 사용하는 모든 연산자에 전달할 수 있습니다.
@dag 데코레이터를 사용하여 함수를 DAG 생성기로 변환할 수 있습니다(airflow.decorators import dag에서).
DAG는 실행할 작업이 없으면 아무것도 없으며 연산자, 센서 또는 TaskFlow 형식으로 제공됩니다.
DAG, 제어 흐름, SubDAG, 작업 그룹 등에 대한 자세한 내용은 Apache Airflow직접 확인할 수 있습니다.
DAG 실행
예제 코드는 git;에서 사용할 수 있습니다. 컴퓨터에서 로컬로 코드를 다운로드하고 blob Storage에 wordcount.py 업로드합니다. 단계에 따라 설치 중에 만든 워크플로로 DAG를 가져옵니다.
이 wordcount.py AKS에서 HDInsight와 함께 Apache Airflow를 사용하여 Flink 작업 제출을 오케스트레이션하는 예제입니다. DAG에는 다음 두 가지 작업이 있습니다.
OAuth Token
가져오기HDInsight Flink 작업 제출 Azure REST API를 호출하여 새 작업 제출
DAG는 OAuth 클라이언트 자격 증명에 대한 설치 프로세스 중에 설명된 대로 서비스 주체에 대한 설정이 완료되어 있을 것으로 기대하며, 실행을 위해 다음 입력 구성을 전달해야 합니다.
실행 단계
Airflow UIDAG를 실행합니다. 모니터 아이콘을 클릭하여 Azure Data Factory 워크플로 오케스트레이션 관리자 UI를 열 수 있습니다.
"DAG" 페이지에서 "FlinkWordCountExample" DAG를 선택합니다.
오른쪽 위 모서리에 있는 "실행" 아이콘을 클릭하고 "구성으로 DAG 실행"을 선택합니다.
필수 구성 JSON 전달
{ "jarName":"WordCount.jar", "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", "subscription":"<cluster subscription id>", "rg":"<cluster resource group>", "poolNm":"<cluster pool name>", "clusterNm":"<cluster name>" }
"트리거" 단추를 클릭하면 DAG 실행이 시작됩니다.
DAG 실행에서 DAG 작업의 상태를 시각화할 수 있습니다.
포털에서 작업 실행 유효성 검사
"Apache Flink 대시보드"에서 작업 유효성 검사
예제 코드
AKS에서 HDInsight와 함께 Airflow를 사용하여 데이터 파이프라인을 오케스트레이션하는 예제입니다.
DAG는 OAuth 클라이언트 자격 증명에 대한 서비스 주체 설정을 예상하고, 실행을 위해 다음 입력 구성을 전달해야 합니다.
{
'jarName':'WordCount.jar',
'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net',
'subscription':'<cluster subscription id>',
'rg':'<cluster resource group>',
'poolNm':'<cluster pool name>',
'clusterNm':'<cluster name>'
}
참조
- 샘플 코드참조하세요.
- Apache Flink 웹 사이트
- Apache, Apache Airflow, Airflow, Apache Flink, Flink 및 관련 오픈 소스 프로젝트 이름은 Apache Software Foundation(ASF)의 상표입니다.