본문 바로가기

Tech/Celery

[Celery]Pytest로 Celery Task 테스트 코드 작성하기

pytest로 celery task에 대하여, Test Code를 작성하려고 했다.

한글로된 블로그, 문서를 찾아보려했으나 보이지가 않음...........

그래서 우여곡절 끝에 찾아서 필요에 맞게 구현함...

우선 구현한 코드 참고 Git 링크

http://github.com/kangprog/celery_pytest

 

GitHub - kangprog/celery_pytest: 파이썬 셀러리 동작을 Pytest로 Test 하기

파이썬 셀러리 동작을 Pytest로 Test 하기. Contribute to kangprog/celery_pytest development by creating an account on GitHub.

github.com

이번 글은 아래와 같은 순서로 진행된다.

  1. 기본적인 Celery 동작 구성
  2. pytest로 Celery Task 테스트 코드 구성하기
  3. pytest가 실행 될 때마다, Docker compose로 Celery Broker를 자동으로 띄우기
  4. 한 Class Test Code에서 동일한 docker container 사용하기


 

기본적인 Celery 동작 구성

celery를 구성하기 위해서는 아래 3개가 필요하다.

  1. 작업을 관리해 줄 Broker
  2. 작업을 가져갈 Worker
  3. 작업 그 자체 Task

순서대로 구성하는 방법을 설명하겠다.

작업을 관리해 줄 Broker 구성하기

먼저, Broker를 세팅 할 건데... Broker는 Redis, RabbitMQ, 등등.. 을 사용할 수 있으나,
각각의 장단점이 있음으로 필요에 맞게 쓰면된다.

참고 링크

https://docs.celeryproject.org/en/stable/getting-started/backends-and-brokers/index.html

 

Backends and Brokers — Celery 5.2.1 documentation

This document describes the current stable version of Celery (5.2). For development docs, go here. Backends and Brokers Release 5.2 Date Nov 16, 2021 Celery supports several message transport alternatives. Broker Overview This is comparison table of the di

docs.celeryproject.org



이 글에서는 Redis를 Broker로 활용한다.
Redis를 이제 설치할건데...
Redis를 막 설치하고... 구축하고.. 귀찮으니까 Docker Image를 활용한다.
추가로 이후에, Docker Compose로 Broker를 자동으로 띄울 예정이니,
여기서 미리 Docker Compose로 redis를 구성한다.

  • Docker 설치 및 Docker Compose 설치는 생략

docker-compose.yml을 만들고 아래와 같이 작성하면 끝.
실행해보고 싶다면, docker-compose -f docker-compose.yml up -d 커맨드 입력

#
# Celery의 Broker 역할을 해 줄 Redis 실행
#

version: '3.4'

services:
  redis:
    image: redis:latest
    container_name: redis
    ports:
      - "6379:6379"




작업을 가져갈 Worker 구성하기

먼저 Celery 라이브러리를 설치한다.

pip install celery

다음, celery_app.py를 만들고 아래와 같이 작성한다.
celery worker를 구동하고 싶다면, celery -A celery_app worker -l info -c 1 커맨드 입력
각 옵션에 대해서는... -l은 LogLevel, -c는 concurrency다. 자세한 내용은 생략

from celery import Celery


app = Celery(
    'app',
    broker='redis://127.0.0.1:6379/0',
    backend='redis://127.0.0.1:6379/0',
    include=[
        'tasks.celery_task'
    ]
)




작업 그자체 Task 구성하기

Broker를 구축하고, Worker까지 다 만들었다면, 이제 실질적으로 작업을 할.. Task를 생성해야한다.
celery_app.py와 같은 경로에 만들어도 상관없으나,
이후 Task가 많아짐을 고려하여, Tasks라는 폴더를 만들고 그 안에 celery_task.py를 만든다.


celery_task.py의 내용은 아래와 같다.
작업은 간단하다.
work_task 함수의 인자로 1이 들어오면 return 1
work_task 함수의 인자로 2가 들어오면 return 2

from celery_app import app


@app.task
def work_task(number):
    if number == 1:
        print(f"input number is {number}")
        return number
    else:
        print(f"input number is {number}")
        return number

 

Celery를 구동하기 위해서 준비는 다 끝났다.
트리거만 없을뿐...

본 목적은 Pytest를 통해 Test Code를 구성하는거라, 기본 구성에서 Task를 동작 시켜볼 필요는 없지만.
구성이 잘되었는지, Celery Task가 잘 동작하는지 확인하기 위해서는 아래와 같이 한다.
(생략 가능)
main.py를 만든다.
Task를 실행하기 위해서는 apply_async 외, delay도 있지만 내용 생략

참고

https://docs.celeryproject.org/en/stable/userguide/calling.html

 

Calling Tasks — Celery 5.2.1 documentation

This document describes the current stable version of Celery (5.2). For development docs, go here. Calling Tasks This document describes Celery’s uniform “Calling API” used by task instances and the canvas. The API defines a standard set of execution

docs.celeryproject.org

from tasks.celery_task import work_task


if __name__ == "__main__":
    true_response_task = work_task.apply_async(args=[1])
    false_response_task = work_task.apply_async(args=[2])




pytest로 Celery Task 테스트 코드 구성하기

Celery 기본 동작 구성도 끝났고.. 본 목적인 Pytest를 사용하여 Test Code를 구성한다.
먼저, TestCode를 구성하기 위해서pytest, pytest-celery 라이브러리를 설치한다

pip install pytest
pip install pytest-celery

 

다음으로, pytest를 구동하기 위한 fixture들을 정의하기 위해 conftest.py를 만들고 작성한다.

  • 이 글에서는 하나의 Test Code를 작성하고 테스트 함으로써, conftest.py를 하나 만들었지만..
    하나의 conftest.py에 많은 fixture를 정의하면 테스트가 느려지는 등.. 관리 이슈 등.. 여러 문제 발생
    즉, conftest.py는 여러개로 나눌 수 있다를 참고하자...
import pytest


@pytest.fixture(scope='session')
def celery_config():
    return {
        'broker_url': 'redis://127.0.0.1/1',
        'result_backend': 'redis://127.0.0.1/1'
    }

 

다음, celery Task를 Test할 Test Code test_celery_work.py를 작성한다.

from tasks.celery_task import work_task


def test_celery_work_return_one(celery_app, celery_worker):
    assert work_task.delay(1).get(timeout=10) == 1
    assert work_task.apply_async(args=[1]).get(timeout=10) == 1


def test_celery_work_return_two(celery_app, celery_worker):
    assert work_task.delay(2).get(timeout=10) == 2
    assert work_task.apply_async(args=[2]).get(timeout=10) == 2



이때, 중요한점
test 함수 인자로 들어간 celery_app, celery_worker 이 것들.
celery worker를 pytest가 실행되는 세션 동안 실행시켜주는 부분이다.
만약, 이 부분을 넣지 않는다면, celery -A celery_app worker -l info -c 1 커맨드로 worker를 따로 실행시켜야한다...


위 방식을 진행함으로써, pytest 세션동안 실행되는 celery worker가 task를 load하기 위해서는,
아래와 같이 이전에 작성했던 celery_task.py를 수정해야한다.

from celery import shared_task # 추가되는 부분
from celery_app import app


@shared_task # 추가되는 부분
@app.task
def work_task(number):
    if number == 1:
        print(f"input number is {number}")
        return number
    else:
        print(f"input number is {number}")
        return number

 

이제 마지막으로, 이전에 작성한 docker-compose.yml으로 redis를 실행 시킨 후, pytest를 동작시키면 동작할 것이다.

  • pytest 구동은... pycharm > RUN/DEBUG Configurations 활용 했음.

 

pytest가 실행 될 때마다, Docker compose로 Celery Broker를 자동으로 띄우기

위 방식대로 하다보면 docker-compose로 redis를 수동으로 띄워야 하는 불편함이 있다.
pytest가 동작할 때마다 자동으로 띄우게 하자...

먼저, pytest-docker-compose 라이브러리를 설치하자.

참고

https://github.com/pytest-docker-compose/pytest-docker-compose

 

GitHub - pytest-docker-compose/pytest-docker-compose: Spin up Docker containers during your integration tests automatically!

Spin up Docker containers during your integration tests automatically! - GitHub - pytest-docker-compose/pytest-docker-compose: Spin up Docker containers during your integration tests automatically!

github.com

pip install pytest-docker-compose



다음, conftest.py에 아래 내용을 추가해야 한다.

pytest_plugins = ["docker_compose"]

@pytest.fixture(scope="function")
def wait_for_docker(function_scoped_container_getter):
    #
    # api server 같은 request가 필요한 Container라면,
    # 여기서 url 리턴하는 로직이 추가되면 된다.
    #
    # yield를 넣어봤으나, pass와 별다를 점이 없다.
    # class test에서 mark.fixtures로 들어가서 해당 class test가 끝날때까지는 유지되는듯?
    #
    pass

 

이제 테스트 코드를 좀 변경해야하는데...
이전에 작성한 test_celery_work.py는 함수형으로만 내부가 구성되어있었다.
Class 케이스도 넣어서 수정해주자...

class TestCeleryWork:
    def test_celery_work_return_one(
            self,
            wait_for_docker,
            celery_config,
            celery_app,
            celery_worker
    ):

        assert work_task.delay(1).get(timeout=10) == 1
        assert work_task.apply_async(args=[1]).get(timeout=10) == 1


def test_celery_work_return_two(
        wait_for_docker,
        celery_config,
        celery_app,
        celery_worker
):
    assert work_task.delay(2).get(timeout=10) == 2
    assert work_task.apply_async(args=[2]).get(timeout=10) == 2



"""
[참고] 위처럼 해도 되고,이렇게 해도 된다.

@pytest.mark.usefixtures('celery_session_app')
@pytest.mark.usefixtures('celery_session_worker')
class TestCeleryWork():
    def test_celery_work_return_one(self):
        assert work_task.delay(1).get(timeout=10) == 1
        assert work_task.apply_async(args=[1]).get(timeout=10) == 1

@pytest.mark.usefixtures('celery_session_app')
@pytest.mark.usefixtures('celery_session_worker')
def test_celery_work_return_two():
    assert work_task.delay(2).get(timeout=10) == 2
    assert work_task.apply_async(args=[2]).get(timeout=10) == 2
"""

 

이제 위 상태로 pytest를 구동하면,
수동으로 docker-compose를 통해 redis를 띄우지 않고도, pytest가 시작됨과 동시에 redis가 자동으로 띄워져
celery task가 구동될 것이다.


근데.. 여기서 문제가있다.
test_celery_work_return_one에서 실행된 redis와 test_celery_work_return_two에서 실행된 redis는 서로 다른 컨테이너이다.
wait_for_docker가 function fixture임으로 호출 될 때마다, 컨테이너를 생성하고, 해당 함수가 끝나면 삭제하는 것을 반복해서이다.


만약 테스트코드를 아래와 같이 변경한다면 어떻게 될까?

class TestCeleryWork:
    def test_celery_work_return_one(
            self,
            wait_for_docker,
            celery_config,
            celery_app,
            celery_worker
    ):
        assert work_task.delay(1).get(timeout=10) == 1
        assert work_task.apply_async(args=[1]).get(timeout=10) == 1

    def test_celery_work_return_two(
            self,
            wait_for_docker,
            celery_config,
            celery_app,
            celery_worker
    ):
        assert work_task.delay(2).get(timeout=10) == 2
        assert work_task.apply_async(args=[2]).get(timeout=10) == 2



본 목적은 class TestCeleryWork 안에서 동작하는 모든 테스트 코드들이 하나의 Redis를 통해 테스트를 하고싶었지만,
매번 테스트 함수들이 실행될 때마다, 생성-삭제를 반복하여 목적대로 동작하지 않을 것이다.

그럼 이제 어떻게 하나의 Redis로 여러 테스트 함수가 공유하며 테스트를 진행할 수 있을지 알아보자.

한 Class Test Code에서 동일한 docker container 사용하기

pytest의 fixture의 scope는 여러개가 있다.

function, class, module, package, session

범위 크기는 위 순서대로이며, session이 가장 큰 범주이다.


각 역할은 아래와 같다.

참고 링크

https://velog.io/@sangyeon217/pytest-fixture
function : fixture가 함수 단위로 1회 생성됨(default)
class : fixture가 클래스 단위로 1회 생성됨
module : fixture가 파일 단위로 1회 생성됨
package : fixture가 패키지 단위로 1회 생성됨
session : fixture가 test session동안 1회 생성됨

 

Pytest Fixture 기능

Pytest Fixture 에 대한 포스팅 입니다.

velog.io

docker container를 공용적으로 사용하기 위해서는 module scope를 활용한다.

먼저, conftest.py를 수정한다.

@pytest.fixture(scope="module") # 수정되는 부분
def wait_for_docker(module_scoped_container_getter): # 수정되는 부분
    #
    # api server 같은 request가 필요한 Container라면,
    # 여기서 url 리턴하는 로직이 추가되면 된다.
    #
    # yield를 넣어봤으나, pass와 별다를 점이 없다.
    # class test에서 mark.fixtures로 들어가서 해당 class test가 끝날때까지는 유지되는듯?
    #
    pass

 

다음으로, test_celery_work.py를 수정한다.

@pytest.mark.usefixtures('wait_for_docker')
class TestCeleryWork:
    def test_celery_work_return_one(
            self,
            celery_config,
            celery_app,
            celery_worker
    ):
        assert work_task.delay(1).get(timeout=10) == 1
        assert work_task.apply_async(args=[1]).get(timeout=10) == 1

    def test_celery_work_return_two(
            self,
            celery_config,
            celery_app,
            celery_worker
    ):
        assert work_task.delay(2).get(timeout=10) == 2
        assert work_task.apply_async(args=[2]).get(timeout=10) == 2



이렇게 수정 후, pytest를 구동하면 하나의 컨테이너(redis)만 띄우고 해당 컨테이너에서 class 내 모든 테스트 함수가 동작하는 것을 볼 수 있다.

반응형

'Tech > Celery' 카테고리의 다른 글

Docker를 활용한 Celery Worker, beat 분리  (0) 2021.07.14