Airflow를 운영하다가 생긴 에러중에 하나이며,
어떻게 조치했는지 기록한다.
내가 운영하는 Airflow의 환경이다.
helm chart version: 1.6.0 (official chart)
- airflow version : 2.3.0, python version 3.7
airflow worker version : 2.3.2-python3.8
EKS위에 airflow를 구축하고 kubernetes executor로 운영 중이다.
구축하는 과정에서, helm chart 1.6.0 (official chart)를 사용해서 설치했고,
worker의 경우는 image를 별도로 분리하여 2.3.2-python3.8로 사용하고 있었다.
위 환경에서,
triggerdagrun Operator를 사용하는 중에 ValueError: unsupported pickle protocol: 5 오류가 발생했다.
이 오류가 발생하게 되면, scheduler가 정상적으로 동작하지 않게 된다.
왜?
dag run을 생성하지 못한다. (무한 에러에 빠짐)
[참고] TriggerDagRunOperator 는 dag의 종속성을 구현하는 쉬운 방법입니다.
해당 operator를 사용하면 동일한 airflow 환경에서 다른 dag를 실행할 수 있습니다.
오류 디테일 내용은 아래와 같다.
[2022-12-13 04:38:54,313] {scheduler_job.py:768} INFO - Exited execute loop
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line 38, in main
args.func(args)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 51, in command
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 99, in wrapper
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
_run_scheduler_job(args=args)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
job.run()
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 244, in run
self._execute()
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 739, in _execute
self._run_scheduler_loop()
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 827, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 901, in _do_scheduling
self._start_queued_dagruns(session)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1055, in _start_queued_dagruns
DagRun.active_runs_of_dags((dr.dag_id for dr in dag_runs), only_running=True, session=session),
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 68, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 256, in active_runs_of_dags
query = query.filter(cls.dag_id.in_(list(set(dag_ids))))
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1055, in <genexpr>
DagRun.active_runs_of_dags((dr.dag_id for dr in dag_runs), only_running=True, session=session),
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/result.py", line 376, in iterrows
for row in self._fetchiter_impl():
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 120, in chunks
fetch = cursor._raw_all_rows()
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/result.py", line 400, in _raw_all_rows
return [make_row(row) for row in rows]
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/result.py", line 400, in <listcomp>
return [make_row(row) for row in rows]
File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/sqltypes.py", line 1816, in process
return loads(value)
ValueError: unsupported pickle protocol: 5
해결 과정은 아래와 같았다.
1. 버그 재현
다른 동료로 부터, Airflow를 사용하는 도중에 에러 문의가 온 것 이었다.
먼저 비지니스 로직이 아닌, 가장 기본 적인 해당 동작으로도 문제가 발생하는지 재현을 해봤다.
사용한 테스트코드(dag file)은 아래와 같다.
# task1.py
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
dag = DAG(
dag_id='task1',
start_date=datetime(2022, 12, 12),
schedule_interval=None,
)
middle = DummyOperator(
task_id='middle',
dag=dag
)
end = DummyOperator(
task_id='end',
dag=dag
)
middle >> end
# task_trigger.py
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
dag = DAG(
dag_id='task_trigger',
start_date=datetime(2022, 12, 12),
schedule_interval=None,
tags=['trigger'],
)
start = DummyOperator(
task_id='start',
dag=dag
)
t1 = TriggerDagRunOperator(
trigger_dag_id='task1',
task_id='trigger',
execution_date='{{ execution_date }}',
wait_for_completion=True,
poke_interval=30,
reset_dag_run=True,
dag=dag
)
start >> t1
DummyOperator와 TriggerDagRunOperator의 가장 기본적인 구성으로 테스트 해봤는데,
동일하게 에러가 발생하는 것을 확인했다. (사용자의 잘못된 사용법에 대한 잘못이 아니다.)
2. 동일한 문제에 대하여 apache Airflow Github Issue에 등록되어 있는지?
https://github.com/apache/airflow/issues?q=+unsupported+pickle+protocol
비슷한 경우의 이슈들이 많았다.
대부분의 이슈 해결 내용으로는, 아래 2가지 방법을 제안하고 있었다.
- metastore DB 내 dag_pickle Table drop
- metastore DB 내 dag_run Table에서 문제되는 Task delete
실제로 scheduler의 상태를 정상적으로 돌려놓기 위해서는 위 방법으로 해결이 되었지만, 근본적인 해결방안은 아니다.
가장 도움이 된 이슈
https://github.com/apache/airflow/issues/13317
위 13317 이슈의 comment에서
python version에 따른 문제라고 하며, 3.8+ 이상 버전부터 pickle5를 지원하기 때문이라고 써 있었다.
내가 운영중인 airflow, python version들을 다시 봤고,
위 코멘트에서 언급한것 처럼, airflow 컴포넌트 간 사용하는 python 버전이 달랐다.
(python 버전의 차이로 발생할 문제에 대해서 고려하지 않고 이전에 그냥 worker python 버전을 3.8로 해놨었다.)
scheduler는 2.3.0 버전의 python 3.7
worker는 2.3.2 버전의 python 3.8
여기서 python 버전차이간에 발생하는걸로 보였고.
worker image를 기존 2.3.2-python3.8에서 2.3.2-python3.7로 바꾸어 다시 배포했다.
재배포 이후, 위에 버그를 재현할 때 썼던 dag를 재실행 해보니, 정상적으로 operator가 동작하는 것을 확인 할 수 있었다.
'Tech > Airflow' 카테고리의 다른 글
Airflow 멀티테넌시 환경 구축하기 (2) | 2022.10.09 |
---|---|
[Airflow] catchup, depends_on_past, wait_for_downstream (0) | 2022.05.08 |
[Airflow] Airflow 학습(3) (0) | 2022.05.01 |
[Airflow] Airflow 학습 (2) (0) | 2022.04.26 |
[Airflow] Airflow 학습 (1) (0) | 2022.04.24 |