Apache Airflow - Workflow 관리 도구(1)


오늘은 Workflow Management Tool인 Apache Airflow 관련 포스팅을 하려고 합니다.
일반적으로 데이터 ETL(Extract, Transform, Load) 과정을 통해 머신러닝 모델을 위한 Dataset을 만들곤 합니다. 또한 다양한 데이터베이스를 사용할 경우 한곳으로 모아서 작업을 해야하는 경우가 있습니다. 위와 같은 경우 여러개의 Sequential한 로직(앞의 output이 뒤의 input이 되는)이 존재하는데 이런 로직들을 한번에 관리할 필요가 있습니다!

관리할 로직이 적다면 CRON + 서버에 직접 접속해 디버깅 하는 방식으로 사용할 수 있지만, 점점 관리할 태스크들이 많아지면 헷갈리는 경우가 생겨 관리 툴을 사용하기로 다짐했습니다. 대표적인 도구로는 하둡 에코시스템에 우지(oozie), luigi같은 솔루션이 있습니다

Apache Airflow는 Java와 Python으로 코드를 작성할 수 있습니다! 따라서 데이터 분석을 하는 사람들이 쉽게 코드를 작성할 수 있습니다

Airflow 콘솔이 따로 존재해 Task 관리를 서버에서 들어가 관리하지 않아도 되고, 각 작업별 시간이 나오기 때문에 bottleneck을 찾을 때에도 유용합니다!

또한 구글 클라우드 플랫폼(BigQuery, Dataflow)을 쉽게 사용할 수 있도록 제공되기 때문에! GCP를 사용하시면 반드시 사용할 것을 추천드립니다

1) Airflow 설치

pip install apache-airflow
gcp를 사용할 예정이면 pip install apache-airflow[gcp]
airflow initdb
airflow webserver -p 8080

위 명령어를 치면 airflow가 실행됩니다!
기본적으로 설치할 경우 ~/airflow에 폴더가 생기게 됩니다. 폴더 내부에 있는 파일을 설명드리겠습니다

  • airflow.cfg : Airflow 관련 설정
  • airflow.db : sqlite 데이터베이스
  • dags 폴더(없다면 mkdir dags로 생성) : 로직이 저장되는 곳입니다

localhost:8080으로 Airflow 콘솔로 접속해보겠습니다!

DAG는 Directed Acyclic Graph의 약자로 Airflow에선 workflow라고 설명하고 있습니다. Task의 집합체라고 생각하시면 될 것 같습니다
메인 화면엔 정의되어 있는 DAG들을 확인할 수 있습니다. 현재는 많은 example들이 보입니다. example을 보고싶지 않다면 airflow.cfg에서 load_examples = False로 지정해주면 됩니다
Schedule은 예정된 스케쥴로 cron 스케쥴의 형태와 동일하게 사용할 수 있습니다
Owner는 소유자를 뜻하는 것으로 Airflow에선 유저를 생성해 유저별 작업을 할당할 수 있음을 뜻합니다!
Recent Tasks/DAG Runs에 최근 실행된 Task들이 나타나며, 실행 완료된 것은 초록색! 재시도는 노란색! 실패는 빨간색으로 표시됩니다

2) Pipeline 정의

Airflow는 $AIRFLOW_HOME(default는 ~/airflow)의 dags 폴더 안에 python 파일을 넣어 관리합니다
Operator라는 것을 통해 Task를 정의합니다. Python Operator, Bash Operator, BigQuery Operator, Dataflow Operator 등등의 Operator가 있습니다

Operator 관련 자료는 링크를 참조해주세요!

각각의 Operator는 unique한 task_id를 가져야 하며, 오퍼레이터별 다른 파라미터를 가지고 있습니다

아래 코드를 dags 폴더 아래에 test.py로 넣어주세요!

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
    

# start_date를 현재날자보다 과거로 설정하면, 
# backfill(과거 데이터를 채워넣는 액션)이 진행됩니다

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 10, 1),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill', # Only celery option
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

# dag 객체 생성
dag = DAG('test', description='First DAG', 
          schedule_interval = '55 14 * * *', 
          default_args=default_args)


t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

# BashOperator를 사용
# task_id는 unique한 이름이어야 합니다
# bash_command는 bash에서 date를 입력한다는 뜻

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

# set_upstream은 t1 작업이 끝나야 t2가 진행된다는 뜻
t2.set_upstream(t1)
# t1.set_downstream(t2)와 동일한 표현입니다
# t1 >> t2 와 동일 표현
t3.set_upstream(t1)

DAG 객체 생성 -> Operator를 활용해 Task 작성 -> Task를 연결 하는 방식으로 코드를 작성하면 됩니다!

DAGs는 각 Workflow를 뜻하고 Operator는 DAG 내에서 정의되는 작업 함수입니다

쉘에서 airflow scheduler를 입력하면 정의된 DAG들이 실행됩니다!

3) Airflow 기본 명령어

airflow list_dags

  • airflow의 dags 폴더 아래에 *.py 파일을 넣은 후, 위 명령어를 입력하면 DAGs의 리스트를 알 수 있습니다
  • 여기에 나오는 DAGs의 이름은 코드에서 DAG객체를 생성할 떄 넣은 이름이 나타납니다

airflow list_tasks test

  • test라는 이름의 dags안에 있는 task들을 출력합니다

airflow list_tasks test --tree

  • test의 task들을 tree 형태로 보여줍니다

airflow test [DAG id] [Task id] [date] 예시 : airflow test test print_date 2017-10-01

  • DAG의 Task 단위로 test

airflow scheduler

  • test를 모두 완료한 후, 스케쥴러를 실행해줍니다. DAG 코드에 정의된 스케쥴을 실행합니다

airflow -h

  • airflow 관련 help 명령 출력

메인 화면에서 DAG의 이름을 클릭하면 Graph View로 볼 수 있습니다! Tree View를 누르면 아래와 같은 이미지로 변하게 됩니다

빨간색 네모 안에 있는 초록색 칸을 클릭하면 아래와 같은 설정이 나옵니다

각종 설정을 정의할 수 있고, Run, Ignore 등의 액션을 할 수 있습니다

Apache Airflow의 간단 사용법에 대해 작성해봤습니다! 다음 글에는 BigQuery Operator, Dataflow Operator에 대한 글을 작성하겠습니다 :)




© 2017. by Seongyun Byeon

Powered by zzsza