본문 바로가기

Tech/Airflow

(6)
[Airflow] ValueError: unsupported pickle protocol: 5 Airflow를 운영하다가 생긴 에러중에 하나이며, 어떻게 조치했는지 기록한다. 내가 운영하는 Airflow의 환경이다. helm chart version: 1.6.0 (official chart) - airflow version : 2.3.0, python version 3.7 airflow worker version : 2.3.2-python3.8 EKS위에 airflow를 구축하고 kubernetes executor로 운영 중이다. 구축하는 과정에서, helm chart 1.6.0 (official chart)를 사용해서 설치했고, worker의 경우는 image를 별도로 분리하여 2.3.2-python3.8로 사용하고 있었다. 위 환경에서, triggerdagrun Operator를 사용하는 중에..
Airflow 멀티테넌시 환경 구축하기 나 같은 쭈니어 데이터엔지니어가 Airflow 멀티테넌시 환경을 구축하게 될 때, 도움이 되고자 구축했던 경험을 글로 남겨 공유한다.! 회사에서 신규 프로젝트로 Airflow를 처음으로 사용하게 되었다. 공식 Airflow Helm Chart를 사용하여 AWS EKS 위에, Kubernetes Executor로 구축하였다. 구축한 Airflow에서 DAG 파일을 관리하기 위해, GitSync(GitLab)를 사용하게 되었는데, 이부분에서 여러명의 데이터 엔지니어가 DAG 파일을 생성, 작업 하기엔 conflict issue 등 여러 불편함이 생겼다. 이 불편함을 해소하기 위해서, 여러 회사들의 방법을 벤치 마킹 했고, 그 과정에서 발견한 대표적인 방법들은 아래와 같았다. - Line: https://en..
[Airflow] catchup, depends_on_past, wait_for_downstream airflow 스케줄러는 DAG File을 실행하기 위해서 DagRun 객체로 인스턴스화 시킨다. catchup catchup default는 True이다. catchup이 True이면 이전 execute date DAG 까지 다 실행한다. False일 경우 가장 최근의 execute date DAG만 실행한다. 예시로 말하자면, catchup이 True일 경우 test_dag 이라는 DAG File을 “0 0 * * *” 스케쥴 작업을 등록했다고 하자. test_dag은 매일 0시 0분에 작업이 실행될 것이다. test_dag을 등록한 날짜가 2022-01-01 02시라 했을 때, test_dag이 첫번째로 실행되는 시간은 2022-01-02 00시 00분 이다. (execute date) 그다음 두번..
[Airflow] Airflow 학습(3) DAG = Data PipeLine Operator = Task DAG을 하나 만들어보자. 먼저 DAG을 생성하기 위해 필요한 선언 부분이다. (이부분은 공통(common) py로 관리하면 좋을듯?) from datetime import datetime, timedelta from airflow import DAG default_args= { "owner": "airflow", #작업 생성자 "email_on_failure": False, #작업 실패시 알림을 받을 이메일 "email_on_retry": False, #작업 재시도시 알림을 받을 이메일 "email": "admin@localhost.com", "retries": 1, #작업 실패시 재시도 횟수 "retry_delay": timedelta(..
[Airflow] Airflow 학습 (2) Airflow는 그럼 어떻게 설치하나? 아래 docker-compose.yml를 참고하자. https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml Airflow를 Docker로 실행하고, 웹 서버를 접속해보면 다음과 같은 화면을 볼 수 있다. Pause/Unpause DAG: 비활성/활성 버튼이다. DAG: DAG 이름이다. Owner: DAG 파일을 만든 소유자이다. 생성자 추적시 사용된다. Runs: DAG 파일의 실행 상태를 나타낸다. (Success, Running, Failed) Schedule: DAG 파일의 실행 주기이다. Last Run: DAG 파일이 가장 최근으로 동작한 시간이다. Recent Tasks: 현재..
[Airflow] Airflow 학습 (1) Airflow를 왜 사용하는가? 이처럼 데이터를 목적에 맞게 활용하기 위해서는 수집 → 처리 → 저장 절차를 거칠 것이다. 처리는 데이터가 수집이 되어야 가능하고, 저장은 저장할 처리된 데이터가 있어야 가능하다. 만약 여기서 데이터 수집에서 문제가 발생한다면? 처리 → 저장 과정은 진행되지 못 할 것이다. 단순히 하나의 데이터 파이프라인이라면, 장애를 파악하기 비교적 편할 것이다. 하지만 하나의 파이프라인이 아닌, x개의 데이터 파이프라인이 존재한다면? 데이터 파이프라인 개수가 많을 수록 장애 확인 및 처리에 시간이 많이 소요 될 것이다. Airflow를 활용한다면, 데이터 파이프라인이 많더라도, 데이터 파이프라인을 시각적 관리, 모니터링, 스케쥴링, 데이터 파이프라인 설정 등을 관리하기 편이하다. 그럼..