AWS 기술 블로그
AWS DataZone에서 OpenLineage 기반의 Airflow 데이터 계보 그리기
배경
Airflow는 데이터 마트(Data Mart)를 포함한 데이터 파이프라인 구축 및 관리에서 매우 널리 사용되는 도구입니다. 이러한 Airflow에서 데이터 계보가 중요한 이유는 데이터의 출처와 변환 과정을 명확히 추적할 수 있어 데이터의 신뢰성을 보장하고, 문제 발생 시 원인을 빠르게 파악할 수 있기 때문입니다. 또한, 데이터 계보는 규제 준수와 감사 요구사항을 충족시키는 데 도움을 주며, 데이터 파이프라인의 변경이 미치는 영향을 사전에 분석하여 위험을 최소화할 수 있게 합니다. 이를 통해 데이터 품질 문제를 쉽게 식별하고 해결할 수 있으며, 오류 발생 시 문제의 근원을 빠르게 찾아 다운타임을 줄이고 시스템 안정성을 높일 수 있습니다. 나아가 데이터 계보는 조직 내 데이터 거버넌스 정책을 강화하여 데이터 소유권, 사용 권한, 보안 정책 등을 효과적으로 관리할 수 있도록 지원합니다. 결국 데이터 계보를 통해 복잡한 데이터 파이프라인을 효율적으로 운영하며, 데이터 기반 의사결정의 신뢰성과 조직의 데이터 활용 능력을 향상시킬 수 있습니다.
솔루션 개요
이번 포스팅에서는 Amazon DataZone의 데이터 계보 기능을 활용하여 Airflow에 대한 데이터 계보를 그리는 방법에 대해서 설명합니다. Amazon DataZone 서비스에서는 DataZone 라이프 사이클 내에서 발생한 게시와 구독 이벤트에 대한 데이터 계보만 표현되며, 그 외의 경우에는 사용자가 데이터 계보를 직접 작성해야 합니다. 이를 위해 AWS Lambda를 사용하여 Airflow에 대한 데이터 계보를 OpenLineage 표준에 맞게 Amazon DataZone에 업데이트하는 방법에 대해서 설명합니다.
Airflow에 대한 데이터 계보를 그리면 아래와 같습니다. 이 그림을 보면 오른쪽 노드에 위치한 Output Node은 왼쪽 Input Node 3개를 사용해서 만들어 진 것을 알 수 있습니다. 그리고 이러한 Airflow Job은 COMPLETE 상태이며 이는 데이터 마트 생성 Job이 성공하였다는 것을 의미합니다.
이외에도 Airflow Job에 대한 상세 정보도 아래와 같이 알 수 있습니다.
솔루션 아키텍처
전체적인 아키텍처는 Prodcution 계정과 Governance 계정으로 구성되며, 두 계정은 Transit Gateway를 통해 연결되어 네트워크 통신이 가능하도록 설정됩니다. 먼저, Prodcution계정에서는 데이터의 저장 및 메타데이터 생성 작업이 이루어집니다. 관계형 데이터베이스(RDS)는 데이터를 저장하고, 사용자의 요청에 따라 뷰 테이블(View Tables)을 생성합니다. 이후 AWS Glue Crawler가 RDS 데이터를 스캔하여 메타데이터를 자동으로 생성하며, 이 메타데이터는 Glue Data Catalog에 저장됩니다.
Amazon Managed Workflows for Apache Airflow(MWAA)에서 매일 View 테이블을 갱신하는 Airflow Job이 배치 형태로 동작하게 됩니다. Airflow Job이 동작할 때 마다 로그 정보를 CloudWatch에 저장하게 됩니다. 이 로그 정보는 Assume Role을 사용해서 Governance 계정에서 조회될 수 있도록 구성합니다. Governance계정에서는 Airflow에 대한 데이터 계보 작성과 사용자 접근 제어가 이루어집니다. Lambda 함수는 매일 24시간 동안 Airflow Job에 대한 로그를 가져옵니다. 이 로그에서 프로시저 함수를 가져오고 함수 이름을 이용해 프로시저 정의 요청 쿼리문을 이용해 Input Table과 Output Table 이름을 조회합니다. 조회한 정보와 Airflow Job 정보를 조합한 후 데이터 계보를 Amazon DataZone에 업데이트 합니다.
사용자는 Data Portal을 통해 이러한 메타데이터와 계보 정보를 검색하고 필요한 데이터 자산에 접근할 수 있습니다. Data Portal은 SSO(Single Sign-On) 기능을 제공하는 Identity Center와 연동되어 사용자 인증을 처리하며, 이를 통해 안전한 데이터 접근이 가능합니다. Governance계정 내에서는 Step Functions가 워크플로우를 관리하며, Secrets Manager를 통해 민감한 정보를 안전하게 저장하고 관리합니다. 또한 Lambda 및 기타 서비스에서 발생하는 로그는 CloudWatch Logs에 기록되어 모니터링과 분석에 활용됩니다.
사전 준비 사항
다음과 같은 사항이 사전에 준비되어야 합니다.
- Amazon RDS for PostgreSQL에 스키마 및 테이블 생성되어 있어야 함
- Amazon DataZone에 Table과 View Table이 자산으로 등록되어 있어야 함
- Amazon Managed Workflows for Apache Airflow(MWAA)에 Airflow Job이 구성되어 있어야 함
- Daily Batch로 프로시저 함수(View Table 갱신 함수)가 실행되고 있어야 함
RDBMS View Table 계보 작성 단계 요약
- 단계 1: Amazon Managed Workflows for Apache Airflow(MWAA)에 OpenLineage구성
- 단계 2: Airflow 로그 Assume Role을 이용한 로그 공유
- 단계 3: CloudWatch 로그 쿼리
- 단계 4: 프로시저 정의 요청 쿼리
- 단계 5: OpenLineage SQL Parser을 이용해 Input/Output 테이블 분석
- 단계 6: 데이터 계보 작성 및 업데이트
단계 1: Amazon Managed Workflows for Apache Airflow(MWAA)에 OpenLineage 구성
Amazon Managed Workflows for Apache Airflow(MWAA)에 OpenLineage 구성을 위해서는 MWAA 내 DAG 스크립트가 실행되는 이벤트가 1) OpenLineage 형태로 이벤트가 생성되고, 2) 이 이벤트가 CloudWatch Logs에 기록됩니다. 이를 특정 3) 람다 함수(Extract Lineage Events)가 Parsing 해서 DataZone 데이터 계보 이벤트로 전달하게 됩니다. 이를 통해서 데이터 포털에서 MWAA에 대한 데이터 계보를 표현할 수 있습니다. 해당 구성은 Airflow 버전 2.9.2에서 수행하였습니다.
Amazon Managed Workflows for Apache Airflow(MWAA) 플러그인 구성
MWAA에서 OpenLineage 구성을 위해서는 OpenLineage에 대한 플로그인 구성을 아래와 같이 진행합니다.
1. AWS 콘솔 → MWAA → 환경 → 구성할 MWAA 이름 클릭합니다.
2. MWAA 콘솔 화면에서 Amazon S3의 DAG 코드 구성 시 사용한 S3 버킷을 확인합니다.
3. requirements.txt 파일을 아래와 같이 작성합니다. OpenLineage 구성 시 필요한 패키지를 지정해 줍니다. 지정 된 패키지가 플러그인 구성 시 MWAA에 설치 되게 됩니다.
4. env_var_plugin.py 파일을 아래와 같이 작성합니다. OpenLineage 이벤트를 어디로 보낼 지 설정하게 됩니다.
5. os.environ[“AIRFLOW__OPENLINEAGE__NAMESPACE”] 값은 MWAA 환경 이름과 같은 값을 사용합니다. 이 이름은 데이터 포털 리니지에서 네임스페이스 이름과 같아야 합니다.
6. “type”: “console” 으로 설정해야 CloudWatch Logs로 OpenLineage 이벤트가 저장됩니다.
7. 이벤트가 저장되는 CloudWatch Logs 그룹은 “Airflow 태스크 로그 그룹”에서 확인할 수 있습니다.
8. env_var_plugin.py 파일을 zip 파일로 압축합니다. 압축 파일명은 plugins.zip으로 압축합니다. Mac 환경에서 압축하게 되면 플러그인 로딩 시 문제가 발생합니다. Linux PC에서 압축하는 것을 추천합니다.
9. plugins.zip 파일과 requirements.txt 파일을 DAG 코드 구성 시 사용한 S3 버킷에 업로드 합니다.
MWAA 구성 편집
1. AWS 콘솔 → MWAA → 환경 → 구성할 MWAA 이름 클릭 → 편집 클릭합니다.
2. 세부 정보 지정 > Amazon S3의 DAG 코드 설정에서 아래와 같이 설정합니다.
3. 플러그인 파일: 위에서 S3에 추가한 plugins.zip 파일을 선택한 후 현재 버전을 선택합니다.
4. 요구 사항 파일: 위에서 S3에 추가한 requirements.txt 파일을 선택한 후 현재 버전을 선택합니다.
5. 다음을 클릭합니다.
6. 고급 설정 구성 > 모니터링에서 아래와 같이 설정합니다.
- Airflow 태스크 로그 enable, 로그 수준 INFO
- Airflow 웹 서버 로그 enable, 로그 수준 INFO
- Airflow 스케줄러 로그 enable, 로그 수준 INFO
- Airflow 작업자 로그 enable, 로그 수준 INFO
- Airflow DAG 프로세싱 로그 enable, 로그 수준 INFO
7. 다음을 클릭한 후 저장을 클릭합니다.
8. MWAA가 사용 가능한 상태까지 기달립니다.
9. 플러그인 설정이 성공하게 되면 Airflow UI에서 구성 된 정보를 확인할 수 있습니다.
10. Airflow UI > Admin > Plugins 화면에서 아래와 같이 설정 된 정보를 확인합니다.
CloudWatch Logs 파일 확인
1. Airflow 태스크 로그 그룹을 클릭합니다.
2. 로그 그룹 검색을 클릭합니다.
3. console.py로 검색을 수행합니다.
4. OpenLineage 형태로 CloudWatch에 저장 된 로그를 확인할 수 있습니다.
단계 2: Airflow 로그 Assume Role을 이용한 로그 공유
Airflow 로그를 Governance Account에 공유할 수 있는 AssumeRole을 생성합니다. 이 작업은 Production 계정(Airflow가 동작하고 있는)에서 수행합니다.
1. IAM → 역할 → 역할 생성
2. “사용자 지정 신뢰 정책” 선택
3. “사용자 지정 신뢰 정책” 아래와 같이 작성
4. 권한 추가에서 “CloudWatchReadOnlyAccess” 선택
5. 역할 이름에 “{Customer Name}-{Project Name}-iam-role-cloudwatch-share-log” 입력 후 생성
단계 3: CloudWatch 로그 쿼리
단계 2에서 공유한 Airflow 관련 CloudWatch 로그를 쿼리하기 위해 Assume Role을 얻어오는 코드를 아래와 같이 구성합니다.
CloudWatch에서 Airflow 로그를 가져오는 코드를 아래와 같이 구성합니다.
단계 4: 프로시저 정의 요청 쿼리
단계 3에서 가져온 CloudWatch 로그는 아래와 같은 형태입니다.
단계 5: OpenLineage SQL Parser을 이용해 Input/Output 테이블 분석
‘query’에 있는 프로시저 이름을 가져와 프로시저 정의를 요청하는 PostgreSQL 시스템 함수를 호출합니다. 얻어온 프로시저 생성 정의문에서 Input Table 이름과 Output Table 이름을 얻어와야 합니다. 이를 위해서 OpenLineage SQL Parser 파이썬 패키지를 사용합니다.
OpenLineage SQL Parser을 이용해 Input/Output 테이블 분석 코드를 실행하게 되면 아래와 같이 Input Table 이름과 Output Table 이름을 얻을 수 있다.
단계 6: 데이터 계보 작성 및 업데이트
Amazon DataZone에서는 OpenLineage 호환 이벤트를 사용하여 데이터 계보를 그릴 수 있습니다. Amazon DataZone 데이터 계보는 아래와 같이 표현 됩니다. Airflow 데이터 계보는 MWAA에서 생성 된 OpenLineage 데이터 계보 이벤트에 Input node와 output node만 추가하면 됩니다.
OpenLineage 이벤트 패킷 구성은 아래와 같습니다.
- eventTime: OpenLineage 이벤트가 발생한 시점을 나타내는 필드입니다. 이는 ISO 8601 형식의 타임스탬프로 기록되며, 이벤트가 생성된 정확한 시간을 포함합니다.
- eventType: 특정 이벤트의 유형을 나타냅니다. 예를 들어, 데이터 처리 작업이 시작되었는지, 완료되었는지, 실패했는지를 정의합니다. 주요 이벤트 유형에는 START, COMPLETE, FAIL, ABORT 등이 있으며, 이는 작업 실행의 상태를 나타냅니다
- job: 데이터 계보 처리 작업을 정의하며, 고유한 이름과 네임스페이스로 식별됩니다.
- run: 특정 Job이 실행되는 인스턴스를 나타내며, 시작 및 완료 시간과 같은 정보를 포함합니다. 각 Run은 고유한 ID(UUID)로 식별되며, 이는 Job의 동적 실행을 추적하는 데 사용됩니다.
- Inputs/Outputs: Inputs는 작업 실행 중 사용된 데이터셋(소스)을 나타내며, Outputs는 작업 결과로 생성된 데이터셋(대상)을 나타냅니다. 각 데이터셋은 네임스페이스와 이름으로 식별되며, 데이터 흐름과 변환 과정을 추적하는 데 중요한 역할을 합니다
- facets: Job, Run, Dataset 등의 엔티티에 부가적인 메타데이터를 추가할 수 있는 확장 가능한 필드입니다. 특정 모델이나 프로세스를 더 세부적으로 표현하기 위해 사용되며, 커스터마이징이 가능합니다. 예를 들어, 데이터셋의 스키마 정보나 변환 방식 등을 포함할 수 있습니다.
AWS DataZone에서 Input 테이블과 output 테이블로 등록 된 자산 정보를 조회하는 코드를 아래와 같이 사용합니다.
위 OpenLineage 이벤트 패킷 구성을 기반으로 Inputs 테이블 정보를 아래와 같이 추가합니다.
위 OpenLineage 이벤트 패킷 구성을 기반으로 Outputs 테이블 정보를 아래와 같이 추가합니다.
작성된 Airflow 데이터 계보를 확인합니다.
결론
이번 포스팅에서는 Amazon DataZone의 데이터 계보 기능을 활용하여 데이터 마트에서 많이 사용하는 Airflow에 대한 데이터 계보를 그리는 방법에 대해서 알아보았습니다. Airflow에서 데이터 계보를 사용하면 데이터 파이프라인의 신뢰성과 효율성을 보장하는 데 핵심적인 역할을 합니다. 데이터 계보를 통해 데이터의 출처와 변환 과정을 명확히 추적할 수 있어 데이터 품질 문제를 빠르게 식별하고 해결할 수 있습니다. 이는 시스템 다운타임을 줄이고 안정성을 높이는 데 기여합니다. 또한, 데이터 계보는 규제 준수 및 감사 요구사항을 충족하며, 데이터 파이프라인 변경 시 영향을 사전에 분석하여 위험을 최소화할 수 있도록 도와줄 수 있습니다. 이를 통해 조직은 데이터 거버넌스 정책을 강화하고, 데이터 소유권, 사용 권한, 보안 정책 등을 효과적으로 관리할 수 있습니다. 궁극적으로 데이터 계보는 복잡한 데이터 파이프라인 운영을 최적화하고, 데이터 기반 의사결정의 신뢰성과 조직의 데이터 활용 능력을 향상시킬 수 있습니다.