pytest로 celery task에 대하여, Test Code를 작성하려고 했다.
한글로된 블로그, 문서를 찾아보려했으나 보이지가 않음...........
그래서 우여곡절 끝에 찾아서 필요에 맞게
구현함...
우선 구현한 코드 참고 Git 링크
http://github.com/kangprog/celery_pytest
이번 글은 아래와 같은 순서로 진행된다.
- 기본적인 Celery 동작 구성
- pytest로 Celery Task 테스트 코드 구성하기
- pytest가 실행 될 때마다, Docker compose로 Celery Broker를 자동으로 띄우기
- 한 Class Test Code에서 동일한 docker container 사용하기
기본적인 Celery 동작 구성
celery를 구성하기 위해서는 아래 3개가 필요하다.
순서대로 구성하는 방법을 설명하겠다.
작업을 관리해 줄 Broker
구성하기
먼저, Broker
를 세팅 할 건데... Broker는 Redis, RabbitMQ, 등등.. 을 사용할 수 있으나,
각각의 장단점이 있음으로 필요에 맞게 쓰면된다.
참고 링크
https://docs.celeryproject.org/en/stable/getting-started/backends-and-brokers/index.html
이 글에서는 Redis
를 Broker로 활용한다.
Redis를 이제 설치할건데...
Redis를 막 설치하고... 구축하고.. 귀찮으니까 Docker Image를 활용한다.
추가로 이후에, Docker Compose로 Broker를 자동으로 띄울 예정이니,
여기서 미리 Docker Compose로 redis를 구성한다.
Docker 설치 및 Docker Compose 설치는 생략
docker-compose.yml
을 만들고 아래와 같이 작성하면 끝.
실행해보고 싶다면, docker-compose -f docker-compose.yml up -d
커맨드 입력
#
# Celery의 Broker 역할을 해 줄 Redis 실행
#
version: '3.4'
services:
redis:
image: redis:latest
container_name: redis
ports:
- "6379:6379"
작업을 가져갈 Worker
구성하기
먼저 Celery 라이브러리를 설치한다.
pip install celery
다음, celery_app.py
를 만들고 아래와 같이 작성한다.
celery worker를 구동하고 싶다면, celery -A celery_app worker -l info -c 1
커맨드 입력
각 옵션에 대해서는... -l은 LogLevel
, -c는 concurrency
다. 자세한 내용은 생략
from celery import Celery
app = Celery(
'app',
broker='redis://127.0.0.1:6379/0',
backend='redis://127.0.0.1:6379/0',
include=[
'tasks.celery_task'
]
)
작업 그자체 Task
구성하기
Broker를 구축하고, Worker까지 다 만들었다면, 이제 실질적으로 작업을 할.. Task
를 생성해야한다.
celery_app.py와 같은 경로에 만들어도 상관없으나,이후 Task가 많아짐을 고려
하여, Tasks라는 폴더를 만들고 그 안에 celery_task.py
를 만든다.
celery_task.py의 내용은 아래와 같다.
작업은 간단하다.
work_task 함수의 인자로 1이 들어오면 return 1
work_task 함수의 인자로 2가 들어오면 return 2
from celery_app import app
@app.task
def work_task(number):
if number == 1:
print(f"input number is {number}")
return number
else:
print(f"input number is {number}")
return number
Celery를 구동하기 위해서 준비는 다 끝났다.
트리거만 없을뿐...
본 목적은 Pytest를 통해 Test Code를 구성하는거라, 기본 구성에서 Task를 동작 시켜볼 필요는 없지만.
구성이 잘되었는지, Celery Task가 잘 동작하는지 확인하기 위해서는 아래와 같이 한다.(생략 가능)
main.py를 만든다.
Task를 실행하기 위해서는 apply_async
외, delay
도 있지만 내용 생략
참고
https://docs.celeryproject.org/en/stable/userguide/calling.html
from tasks.celery_task import work_task
if __name__ == "__main__":
true_response_task = work_task.apply_async(args=[1])
false_response_task = work_task.apply_async(args=[2])
pytest로 Celery Task 테스트 코드 구성하기
Celery 기본 동작 구성도 끝났고.. 본 목적인 Pytest를 사용하여 Test Code를 구성한다.
먼저, TestCode를 구성하기 위해서pytest
, pytest-celery
라이브러리를 설치한다
pip install pytest
pip install pytest-celery
다음으로, pytest를 구동하기 위한 fixture들을 정의하기 위해 conftest.py
를 만들고 작성한다.
- 이 글에서는 하나의 Test Code를 작성하고 테스트 함으로써, conftest.py를 하나 만들었지만..
하나의 conftest.py에 많은 fixture를 정의하면 테스트가 느려지는 등.. 관리 이슈 등.. 여러 문제 발생
즉, conftest.py는 여러개로 나눌 수 있다를 참고하자...
import pytest
@pytest.fixture(scope='session')
def celery_config():
return {
'broker_url': 'redis://127.0.0.1/1',
'result_backend': 'redis://127.0.0.1/1'
}
다음, celery Task
를 Test할 Test Code
test_celery_work.py
를 작성한다.
from tasks.celery_task import work_task
def test_celery_work_return_one(celery_app, celery_worker):
assert work_task.delay(1).get(timeout=10) == 1
assert work_task.apply_async(args=[1]).get(timeout=10) == 1
def test_celery_work_return_two(celery_app, celery_worker):
assert work_task.delay(2).get(timeout=10) == 2
assert work_task.apply_async(args=[2]).get(timeout=10) == 2
이때, 중요한점
test 함수 인자로 들어간 celery_app
, celery_worker
이 것들.
celery worker를 pytest가 실행되는 세션 동안 실행시켜주는 부분이다.
만약, 이 부분을 넣지 않는다면, celery -A celery_app worker -l info -c 1
커맨드로 worker를 따로 실행시켜야한다...
위 방식을 진행함으로써, pytest 세션동안 실행되는 celery worker가 task를 load하기 위해서는,
아래와 같이 이전에 작성했던 celery_task.py
를 수정해야한다.
from celery import shared_task # 추가되는 부분
from celery_app import app
@shared_task # 추가되는 부분
@app.task
def work_task(number):
if number == 1:
print(f"input number is {number}")
return number
else:
print(f"input number is {number}")
return number
이제 마지막으로, 이전에 작성한 docker-compose.yml
으로 redis를 실행 시킨 후, pytest를 동작시키면 동작할 것이다.
- pytest 구동은... pycharm > RUN/DEBUG Configurations 활용 했음.
pytest가 실행 될 때마다, Docker compose로 Celery Broker를 자동으로 띄우기
위 방식대로 하다보면 docker-compose로 redis를 수동으로 띄워야 하는 불편함이 있다.
pytest가 동작할 때마다 자동으로 띄우게 하자...
먼저, pytest-docker-compose
라이브러리를 설치하자.
참고
https://github.com/pytest-docker-compose/pytest-docker-compose
pip install pytest-docker-compose
다음, conftest.py
에 아래 내용을 추가해야 한다.
pytest_plugins = ["docker_compose"]
@pytest.fixture(scope="function")
def wait_for_docker(function_scoped_container_getter):
#
# api server 같은 request가 필요한 Container라면,
# 여기서 url 리턴하는 로직이 추가되면 된다.
#
# yield를 넣어봤으나, pass와 별다를 점이 없다.
# class test에서 mark.fixtures로 들어가서 해당 class test가 끝날때까지는 유지되는듯?
#
pass
이제 테스트 코드를 좀 변경해야하는데...
이전에 작성한 test_celery_work.py
는 함수형으로만 내부가 구성되어있었다.
Class 케이스도 넣어서 수정해주자...
class TestCeleryWork:
def test_celery_work_return_one(
self,
wait_for_docker,
celery_config,
celery_app,
celery_worker
):
assert work_task.delay(1).get(timeout=10) == 1
assert work_task.apply_async(args=[1]).get(timeout=10) == 1
def test_celery_work_return_two(
wait_for_docker,
celery_config,
celery_app,
celery_worker
):
assert work_task.delay(2).get(timeout=10) == 2
assert work_task.apply_async(args=[2]).get(timeout=10) == 2
"""
[참고] 위처럼 해도 되고,이렇게 해도 된다.
@pytest.mark.usefixtures('celery_session_app')
@pytest.mark.usefixtures('celery_session_worker')
class TestCeleryWork():
def test_celery_work_return_one(self):
assert work_task.delay(1).get(timeout=10) == 1
assert work_task.apply_async(args=[1]).get(timeout=10) == 1
@pytest.mark.usefixtures('celery_session_app')
@pytest.mark.usefixtures('celery_session_worker')
def test_celery_work_return_two():
assert work_task.delay(2).get(timeout=10) == 2
assert work_task.apply_async(args=[2]).get(timeout=10) == 2
"""
이제 위 상태로 pytest를 구동하면,
수동으로 docker-compose를 통해 redis를 띄우지 않고도, pytest가 시작됨과 동시에 redis가 자동으로 띄워져
celery task가 구동될 것이다.
근데.. 여기서 문제가있다.test_celery_work_return_one
에서 실행된 redis와 test_celery_work_return_two
에서 실행된 redis는 서로 다른 컨테이너
이다.
wait_for_docker가 function fixture임으로 호출 될 때마다, 컨테이너를 생성하고, 해당 함수가 끝나면 삭제하는 것을 반복해서이다.
만약 테스트코드를 아래와 같이 변경한다면 어떻게 될까?
class TestCeleryWork:
def test_celery_work_return_one(
self,
wait_for_docker,
celery_config,
celery_app,
celery_worker
):
assert work_task.delay(1).get(timeout=10) == 1
assert work_task.apply_async(args=[1]).get(timeout=10) == 1
def test_celery_work_return_two(
self,
wait_for_docker,
celery_config,
celery_app,
celery_worker
):
assert work_task.delay(2).get(timeout=10) == 2
assert work_task.apply_async(args=[2]).get(timeout=10) == 2
본 목적은 class TestCeleryWork
안에서 동작하는 모든 테스트 코드들이 하나의 Redis를 통해 테스트를 하고싶었지만,
매번 테스트 함수들이 실행될 때마다, 생성-삭제를 반복하여 목적대로 동작하지 않을 것이다.
그럼 이제 어떻게 하나의 Redis로 여러 테스트 함수가 공유하며 테스트를 진행할 수 있을지 알아보자.
한 Class Test Code에서 동일한 docker container 사용하기
pytest의 fixture의 scope는 여러개가 있다.
function, class, module, package, session
범위 크기는 위 순서대로이며, session이 가장 큰 범주이다.
각 역할은 아래와 같다.
참고 링크
https://velog.io/@sangyeon217/pytest-fixturefunction
: fixture가 함수 단위로 1회 생성됨(default)class
: fixture가 클래스 단위로 1회 생성됨module
: fixture가 파일 단위로 1회 생성됨package
: fixture가 패키지 단위로 1회 생성됨session
: fixture가 test session동안 1회 생성됨
docker container를 공용적으로 사용하기 위해서는 module
scope를 활용한다.
먼저, conftest.py
를 수정한다.
@pytest.fixture(scope="module") # 수정되는 부분
def wait_for_docker(module_scoped_container_getter): # 수정되는 부분
#
# api server 같은 request가 필요한 Container라면,
# 여기서 url 리턴하는 로직이 추가되면 된다.
#
# yield를 넣어봤으나, pass와 별다를 점이 없다.
# class test에서 mark.fixtures로 들어가서 해당 class test가 끝날때까지는 유지되는듯?
#
pass
다음으로, test_celery_work.py
를 수정한다.
@pytest.mark.usefixtures('wait_for_docker')
class TestCeleryWork:
def test_celery_work_return_one(
self,
celery_config,
celery_app,
celery_worker
):
assert work_task.delay(1).get(timeout=10) == 1
assert work_task.apply_async(args=[1]).get(timeout=10) == 1
def test_celery_work_return_two(
self,
celery_config,
celery_app,
celery_worker
):
assert work_task.delay(2).get(timeout=10) == 2
assert work_task.apply_async(args=[2]).get(timeout=10) == 2
이렇게 수정 후, pytest를 구동하면 하나의 컨테이너(redis)만 띄우고 해당 컨테이너에서 class 내 모든 테스트 함수가 동작하는 것을 볼 수 있다.
'Tech > Celery' 카테고리의 다른 글
Docker를 활용한 Celery Worker, beat 분리 (0) | 2021.07.14 |
---|