본문 바로가기

Tech/Airflow

[Airflow] catchup, depends_on_past, wait_for_downstream

airflow 스케줄러는 DAG File을 실행하기 위해서 DagRun 객체로 인스턴스화 시킨다.

 

catchup

catchup default는 True이다.

catchup이 True이면 이전 execute date DAG 까지 다 실행한다.

False일 경우 가장 최근의 execute date DAG만 실행한다.

 

예시로 말하자면,

catchup이 True일 경우

test_dag 이라는 DAG File을 “0 0 * * *” 스케쥴 작업을 등록했다고 하자.

 

test_dag은 매일 0시 0분에 작업이 실행될 것이다.

test_dag을 등록한 날짜가 2022-01-01 02시라 했을 때,

test_dag이 첫번째로 실행되는 시간은 2022-01-02 00시 00분 이다. (execute date)

 

그다음 두번째로 실행 날짜는 2022-01-03 00시 00분 일 것이다.

두번째 실행까지 완료하고, 이 후 DAG의 task에 문제가 있어서, 2일동안 DAG을 비활성 해놓았다고 가정하자. (2022-01-04 ~ 2022-01-05 )

그럼 현재 날짜는 2022-01-05 00시 00분 이 후 일 것이다.

 

문제를 해결하고 세번째 작업이 정상적으로 다시 실행되도록 활성화 시키고, 2022-01-06 00시 00분에 세번째 작업이 실행된다.

 

이때, test_dag의 run execute date는 catchup이 True이면, 아래와 같이 보일 것이다.

2022-01-02

2022-01-03

2022-01-04

2022-01-05

2022-01-06

 

비활성 되있었던 기간까지 실행이 된 것이다. 이때, 특이점으로는 2022-01-04 ~ 2022-01-06까지 start date가 마지막 작업이 실행된 날짜 기준으로 동일하다는 것이다.

 

catchup이 False일 경우

test_dag 이라는 DAG File을 “0 0 * * *” 스케쥴 작업을 등록했다고 하자.

test_dag은 매일 0시 0분에 작업이 실행될 것이다.

test_dag을 등록한 날짜가 2022-01-01 02시라 했을 때,

test_dag이 첫번째로 실행되는 시간은 2022-01-02 00시 00분 이다. (execute date)

 

그다음 두번째로 실행 날짜는 2022-01-03 00시 00분 일 것이다.

두번째 실행까지 완료하고, 이 후 DAG의 task에 문제가 있어서, 2일동안 DAG을 비활성 해놓았다고 가정하자. (2022-01-04 ~ 2022-01-05)

그럼 현재 날짜는 2022-01-05 00시 00분 이 후 일 것이다.

 

세번째 작업이 정상적으로 다시 실행되도록 활성화 시키고, 2022-01-06 00시 00분에 세번째 작업이 실행된다.

 

이때, test_dag의 run execute date는 catchup이 False이면, 아래와 같이 보일 것이다.

2022-01-02

2022-01-03

2022-01-06

비활성 되있었던 기간은 실행 되지 않고 가장 최근의 excute date를 대상으로만 실행 된 것이다.

위 예시와 날짜가 조금 다르지만 (catchup이 True일 경우) 그림으로 설명한다면 아래와 같다.

사진 출처:  https://medium.com/nerd-for-tech/airflow-catchup-backfill-demystified-355def1b6f92
사진 출처:  https://medium.com/nerd-for-tech/airflow-catchup-backfill-demystified-355def1b6f92

 

 

+추가

만약, catchup이 False인 상태에서 이전 날짜들의 작업이 실패하여 정상적으로 동작하지 않았을 경우, 다시 강제로 DAG의 해당날짜를 돌리고 싶다면 아래와 같은 인터프리터 명령어를 통해 할 수 있다.

airflow backfill -s 2021-01-03 -e 2021-01-06 —rerun_failed_tasks -B <DAG NAME>

+참고

 

Airflow Catchup & Backfill — Demystified

Airflow allows missed DAG Runs to be scheduled again so that the pipelines catchup on the schedules that were missed for some reason. It…

medium.com

 

 

depends_on_past

default는 False이다.

default_args나 각 Operator에서 인자로 사용 가능하다.

 

True일 경우, upstream 작업이 문제가 있을 경우 다음 DagRun부터 해당 테스크 이하는 실행되지 않는다.

 

예를 들자면,

test_task1 >> test_task2 >> test_task3 순서인 DAG File이 있다고 하자.

각 Task는 단순히 print(”test_task”)를 출력한다.

 

depends_on_past가 True인 상태로 옵션을 주고.

DAG파일이 실행하게되면,

test_task1 >> test_task2 >> test_task3는 정상적으로 수행될 것이다.

 

이 상태에서, test_task2에 변화를 준다.

print를 빼고 일부러 Exception을 발생 시키는 코드를 넣는다.

그 후, DAG File을 다시 실행 시키면,

test_task1 >> test_task2(Failed) >> test_task3(upstream_failed)가 될 것이다.

 

이 상태에서 다시 모두 print를 출력하도록 정상동작하게 바꾼 후,

다시 DAG File을 실행 시키면,

test_task1 >> test_task2(no_status) >> test_task3(no_status) 상태로 된다.

사진 출처:  https://eprj453.github.io/airflow/2021/03/13/DataEngineering-airflow-depends_on_past-wait_for_downstream/

 

 

test_task2에서 일부러 Exception을 띄웠던 부분과 depends_on_past 옵션이 True이기 때문에, 이전 Job에서 실패했던 Task이하는 스케줄러가 Queue에 해당 Task를 넣지 않아서이다.

 

만약 해당 Job을 실행시키고 싶다면, 이전 Job에서 해당 Task 부분(실패한)을 Success로 Mark해준다.

 

wait_for_downstream

wait_for_downstream이 True일 때,

이전 DagRun에서 test_task2가 실패했을 때,

다음으로 실행될 DagRun는 Running 상태이지만 no_status 상태가 된다.

이전 test_task2 이후, 작업들이 downstream으로 포함되어있기 때문에 해당 옵션으로 인해 작업이 진행되지 않는 것이다.

사진 출처:  https://eprj453.github.io/airflow/2021/03/13/DataEngineering-airflow-depends_on_past-wait_for_downstream/

 

+참고

 

 

[airflow] airflow - depends_on_past / wait_for_downstream

airflow task 설정을 하다가, 이전 task에 의존적으로 실행 계획을 만들 수 있는 옵션 2개를 찾았습니다. 이전 task에 상관없이 실행 가능한 모든 task를 실행시키는 경우에는 이 옵션들이 의미가 없겠

eprj453.github.io

 

 

Airflow에서 자주하는 실수들

Airflow 사용자들이 자주하는 실수들을 집어보고 해결책을 제시하고자 합니다.

velog.io

 

반응형

'Tech > Airflow' 카테고리의 다른 글

[Airflow] ValueError: unsupported pickle protocol: 5  (1) 2022.12.13
Airflow 멀티테넌시 환경 구축하기  (2) 2022.10.09
[Airflow] Airflow 학습(3)  (0) 2022.05.01
[Airflow] Airflow 학습 (2)  (0) 2022.04.26
[Airflow] Airflow 학습 (1)  (0) 2022.04.24