본문 바로가기

Tech/Airflow

[Airflow] Airflow 학습(3)

DAG = Data PipeLine

Operator = Task

 

DAG을 하나 만들어보자.

먼저 DAG을 생성하기 위해 필요한 선언 부분이다.

(이부분은 공통(common) py로 관리하면 좋을듯?)

from datetime import datetime, timedelta
from airflow import DAG

default_args= {
    "owner": "airflow", #작업 생성자
    "email_on_failure": False, #작업 실패시 알림을 받을 이메일
    "email_on_retry": False, #작업 재시도시 알림을 받을 이메일
    "email": "admin@localhost.com",
    "retries": 1, #작업 실패시 재시도 횟수
    "retry_delay": timedelta(minutes=5), #다시 시도하기까지 지연시간

}

with DAG("test_pipeline",
        start_date=datetime(2021, 1, 1),
        schedule_interval="@daily",
        default_args=default_args,
        catchup=False) as dag:

 

위처럼 start_date를 2021년 1월 1일로 설정할 경우,

해당 Job이 2021년 1월 1일부터 동작하는 것이 아닌, 2021년 1월 2일부터 시작된다.

이유는 아래 사진처럼, 1월 2일 (1월 1일 00:00~23:59까지의 데이터를 다 갖춘 후)에 1월1일의 데이터 전부를 대상으로 작업이 진행 될 수 있기 때문이다. (Schedule_interval이 daily로 설정되어있기 때문에 1일 후 동작한다)

사진 출처:

 

버킷플레이스 Airflow 도입기 - 오늘의집 블로그

탁월한 데이터플랫폼을 위한 Airflow 도입기

www.bucketplace.co.kr

 

즉 스케쥴의 start_date = 1월1일은 스케줄이 등록된 시간

실제로 이후 작업이 실행되는 시간은 1월 2일이 되는 것이다.이때 execution_date = 1월 1일

 

다음으로 Operator를 사용하여 작업을 만들어야하는데,

Operator는 여러개가 존재한다.

 

Operators — Airflow Documentation

 

airflow.apache.org

 

에어플로우 공식 어플에서는 주로 쓰는 3개 오퍼레이션으로 아래 3개가 있다고 한다.

PythonOperator

BashOperator

EmailOperator

외에 Documentation에 더많은 Operator가 있다.

 

사용할 수있는 오퍼레이터는 여러개가 있지만 크게 3분류로 나눈다면 Action, Transfer, Sensor로 나눌 수 있다.

Action Operator → ex.) Bash, Python

Transfer Operator → ex.) Postgres

Sensor Operator → default 60s ex.) 해당 위치에 파일이 있는지 확인한다.

 

기본적으로 DAG을 생성하면서 Operator를 만들기 이전에 Airflow Document를 항상 보자

 

Home

Platform created by the community to programmatically author, schedule and monitor workflows.

airflow.apache.org

 

DAG파일을 만약 하나 만들고, Operator까지 하나 만들었다면,

DAG을 바로 실행하는 것이 아니라,

항상! Operator를 테스트 실행 먼저 해보자. (제일 중요)

airflow tasks test <DAG File Name> <Task Name> <Start-date> <End-date>

 

예시로 PythonOperator를 하나 해보자면.

먼저, PythonOperator를 공식 Documentation에서 찾는다.

 

airflow.operators.python — Airflow Documentation

 

airflow.apache.org

 

그 후, 어떻게 쓰는건지 확인 하고. 아래와 같이 작성 할 수 있을 것이다.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

default_args= {
    "owner": "airflow", #작업 생성자
    "email_on_failure": False, #작업 실패시 알림을 받을 이메일
    "email_on_retry": False, #작업 재시도시 알림을 받을 이메일
    "email": "admin@localhost.com",
    "retries": 1, #작업 실패시 재시도 횟수
    "retry_delay": timedelta(minutes=5), #다시 시도하기까지 지연시간

}

def python_operator_test():
    return "A"

with DAG("test_pipeline",
        start_date=datetime(2021, 1, 1),
        schedule_interval="@daily",
        default_args=default_args,
        catchup=False) as dag:

    is_test_operator1 = PythonOperator(
        task_id="is_test_operator1",
        python_callable=python_operator_test
    )

 

작성을 다했다면,

DAG을 실행시키기 이전에 task에 대한 test를 먼저 실행한다.

airflow tasks test test_pipeline is_test_operator 2021-05-01

이처럼 다른 Operator들도 생성하면된다!

반응형