airflow 스케줄러는 DAG File을 실행하기 위해서 DagRun 객체로 인스턴스화 시킨다.
catchup
catchup default는 True이다.
catchup이 True이면 이전 execute date DAG 까지 다 실행한다.
False일 경우 가장 최근의 execute date DAG만 실행한다.
예시로 말하자면,
catchup이 True일 경우
test_dag 이라는 DAG File을 “0 0 * * *” 스케쥴 작업을 등록했다고 하자.
test_dag은 매일 0시 0분에 작업이 실행될 것이다.
test_dag을 등록한 날짜가 2022-01-01 02시라 했을 때,
test_dag이 첫번째로 실행되는 시간은 2022-01-02 00시 00분 이다. (execute date)
그다음 두번째로 실행 날짜는 2022-01-03 00시 00분 일 것이다.
두번째 실행까지 완료하고, 이 후 DAG의 task에 문제가 있어서, 2일동안 DAG을 비활성 해놓았다고 가정하자. (2022-01-04 ~ 2022-01-05 )
그럼 현재 날짜는 2022-01-05 00시 00분 이 후 일 것이다.
문제를 해결하고 세번째 작업이 정상적으로 다시 실행되도록 활성화 시키고, 2022-01-06 00시 00분에 세번째 작업이 실행된다.
이때, test_dag의 run execute date는 catchup이 True이면, 아래와 같이 보일 것이다.
2022-01-02
2022-01-03
2022-01-04
2022-01-05
2022-01-06
비활성 되있었던 기간까지 실행이 된 것이다. 이때, 특이점으로는 2022-01-04 ~ 2022-01-06까지 start date가 마지막 작업이 실행된 날짜 기준으로 동일하다는 것이다.
catchup이 False일 경우
test_dag 이라는 DAG File을 “0 0 * * *” 스케쥴 작업을 등록했다고 하자.
test_dag은 매일 0시 0분에 작업이 실행될 것이다.
test_dag을 등록한 날짜가 2022-01-01 02시라 했을 때,
test_dag이 첫번째로 실행되는 시간은 2022-01-02 00시 00분 이다. (execute date)
그다음 두번째로 실행 날짜는 2022-01-03 00시 00분 일 것이다.
두번째 실행까지 완료하고, 이 후 DAG의 task에 문제가 있어서, 2일동안 DAG을 비활성 해놓았다고 가정하자. (2022-01-04 ~ 2022-01-05)
그럼 현재 날짜는 2022-01-05 00시 00분 이 후 일 것이다.
세번째 작업이 정상적으로 다시 실행되도록 활성화 시키고, 2022-01-06 00시 00분에 세번째 작업이 실행된다.
이때, test_dag의 run execute date는 catchup이 False이면, 아래와 같이 보일 것이다.
2022-01-02
2022-01-03
2022-01-06
비활성 되있었던 기간은 실행 되지 않고 가장 최근의 excute date를 대상으로만 실행 된 것이다.
위 예시와 날짜가 조금 다르지만 (catchup이 True일 경우) 그림으로 설명한다면 아래와 같다.
+추가
만약, catchup이 False인 상태에서 이전 날짜들의 작업이 실패하여 정상적으로 동작하지 않았을 경우, 다시 강제로 DAG의 해당날짜를 돌리고 싶다면 아래와 같은 인터프리터 명령어를 통해 할 수 있다.
airflow backfill -s 2021-01-03 -e 2021-01-06 —rerun_failed_tasks -B <DAG NAME>
+참고
depends_on_past
default는 False이다.
default_args나 각 Operator에서 인자로 사용 가능하다.
True일 경우, upstream 작업이 문제가 있을 경우 다음 DagRun부터 해당 테스크 이하는 실행되지 않는다.
예를 들자면,
test_task1 >> test_task2 >> test_task3 순서인 DAG File이 있다고 하자.
각 Task는 단순히 print(”test_task”)를 출력한다.
depends_on_past가 True인 상태로 옵션을 주고.
DAG파일이 실행하게되면,
test_task1 >> test_task2 >> test_task3는 정상적으로 수행될 것이다.
이 상태에서, test_task2에 변화를 준다.
print를 빼고 일부러 Exception을 발생 시키는 코드를 넣는다.
그 후, DAG File을 다시 실행 시키면,
test_task1 >> test_task2(Failed) >> test_task3(upstream_failed)가 될 것이다.
이 상태에서 다시 모두 print를 출력하도록 정상동작하게 바꾼 후,
다시 DAG File을 실행 시키면,
test_task1 >> test_task2(no_status) >> test_task3(no_status) 상태로 된다.
test_task2에서 일부러 Exception을 띄웠던 부분과 depends_on_past 옵션이 True이기 때문에, 이전 Job에서 실패했던 Task이하는 스케줄러가 Queue에 해당 Task를 넣지 않아서이다.
만약 해당 Job을 실행시키고 싶다면, 이전 Job에서 해당 Task 부분(실패한)을 Success로 Mark해준다.
wait_for_downstream
wait_for_downstream이 True일 때,
이전 DagRun에서 test_task2가 실패했을 때,
다음으로 실행될 DagRun는 Running 상태이지만 no_status 상태가 된다.
이전 test_task2 이후, 작업들이 downstream으로 포함되어있기 때문에 해당 옵션으로 인해 작업이 진행되지 않는 것이다.
+참고
'Tech > Airflow' 카테고리의 다른 글
[Airflow] ValueError: unsupported pickle protocol: 5 (1) | 2022.12.13 |
---|---|
Airflow 멀티테넌시 환경 구축하기 (2) | 2022.10.09 |
[Airflow] Airflow 학습(3) (0) | 2022.05.01 |
[Airflow] Airflow 학습 (2) (0) | 2022.04.26 |
[Airflow] Airflow 학습 (1) (0) | 2022.04.24 |