Apache Airflow - Workflow 관리 도구(1)


  • 오늘은 Workflow Management Tool인 Apache Airflow 관련 포스팅을 하려고 합니다.
  • 일반적으로 데이터 ETL(Extract, Transform, Load) 과정을 통해 머신러닝 모델을 위한 Dataset을 만들곤 합니다. 또한 다양한 데이터베이스를 사용할 경우 한곳으로 모아서 작업을 해야하는 경우가 있습니다.
  • 위와 같은 경우 여러개의 Sequential한 로직(앞의 output이 뒤의 input이 되는)이 존재하는데 이런 로직들을 한번에 관리할 필요가 있습니다!

  • 관리할 로직이 적다면 CRON + 서버에 직접 접속해 디버깅 하는 방식으로 사용할 수 있지만, 점점 관리할 태스크들이 많아지면 헷갈리는 경우가 생겨 관리 툴을 사용하기로 다짐했습니다. 대표적인 도구로는 하둡 에코시스템에 우지(oozie), luigi같은 솔루션이 있습니다

  • Apache Airflow는 Java와 Python으로 코드를 작성할 수 있습니다! 따라서 데이터 분석을 하는 사람들이 쉽게 코드를 작성할 수 있습니다

  • Airflow 콘솔이 따로 존재해 Task 관리를 서버에서 들어가 관리하지 않아도 되고, 각 작업별 시간이 나오기 때문에 bottleneck을 찾을 때에도 유용합니다!

  • 또한 구글 클라우드 플랫폼(BigQuery, Dataflow)을 쉽게 사용할 수 있도록 제공되기 때문에! GCP를 사용하시면 반드시 사용할 것을 추천드립니다

  • 이 글은 1.9.0 버전에서 작성되었습니다

1) Airflow 설치

pip install apache-airflow
gcp를 사용할 예정이면 pip install apache-airflow[gcp]
airflow initdb
airflow webserver -p 8080
  • 위 명령어를 치면 airflow가 실행됩니다!
  • 기본적으로 설치할 경우 ~/airflow에 폴더가 생기게 됩니다. 폴더 내부에 있는 파일을 설명드리겠습니다
    • airflow.cfg : Airflow 관련 설정
    • airflow.db : sqlite 데이터베이스
    • dags 폴더(없다면 mkdir dags로 생성) : 로직이 저장되는 곳입니다
  • localhost:8080으로 Airflow 콘솔로 접속해보겠습니다!

  • DAG는 Directed Acyclic Graph의 약자로 Airflow에선 workflow라고 설명하고 있습니다. Task의 집합체라고 생각하시면 될 것 같습니다
  • 메인 화면엔 정의되어 있는 DAG들을 확인할 수 있습니다. 현재는 많은 example들이 보입니다. example을 보고싶지 않다면 airflow.cfg에서 load_examples = False로 지정해주면 됩니다
  • Schedule은 예정된 스케쥴로 cron 스케쥴의 형태와 동일하게 사용할 수 있습니다
  • Owner는 소유자를 뜻하는 것으로 Airflow에선 유저를 생성해 유저별 작업을 할당할 수 있음을 뜻합니다!
  • Recent Tasks/DAG Runs에 최근 실행된 Task들이 나타나며, 실행 완료된 것은 초록색! 재시도는 노란색! 실패는 빨간색으로 표시됩니다

2) Pipeline 정의

  • Airflow는 $AIRFLOW_HOME(default는 ~/airflow)의 dags 폴더 안에 python 파일을 넣어 관리합니다
  • Operator라는 것을 통해 Task를 정의합니다. Python Operator, Bash Operator, BigQuery Operator, Dataflow Operator 등등의 Operator가 있습니다
  • Operator 관련 자료는 링크를 참조해주세요!
  • 각각의 Operator는 unique한 task_id를 가져야 하며, 오퍼레이터별 다른 파라미터를 가지고 있습니다
  • 아래 코드를 dags 폴더 아래에 test.py로 넣어주세요!

      from airflow import models
      from airflow.operators.bash_operator import BashOperator
      from datetime import datetime, timedelta
    	    
    	
      # start_date를 현재날자보다 과거로 설정하면, 
      # backfill(과거 데이터를 채워넣는 액션)이 진행됩니다
    	
      default_args = {
          'owner': 'airflow',
          'depends_on_past': False,
          'start_date': datetime(2017, 10, 1),
          'email': ['airflow@airflow.com'],
          'email_on_failure': False,
          'email_on_retry': False,
          'retries': 1,
          'retry_delay': timedelta(minutes=5)	}
    	
      # dag 객체 생성
      with models.DAG(
              dag_id='test', description='First DAG', 
            schedule_interval = '55 14 * * *', 
            default_args=default_args) as dag:
    	
    		
          t1 = BashOperator(
              task_id='print_date',
              bash_command='date',
              dag=dag)
    		
          # BashOperator를 사용
          # task_id는 unique한 이름이어야 합니다
          # bash_command는 bash에서 date를 입력한다는 뜻
    		
          t2 = BashOperator(
              task_id='sleep',
              bash_command='sleep 5',
              retries=3,
              dag=dag)
    		
          templated_command = """
    		    
          """
    		
          t3 = BashOperator(
              task_id='templated',
              bash_command=templated_command,
              params={'my_param': 'Parameter I passed in'},
              dag=dag)
    		
          # set_upstream은 t1 작업이 끝나야 t2가 진행된다는 뜻
          t2.set_upstream(t1)
          # t1.set_downstream(t2)와 동일한 표현입니다
          # t1 >> t2 와 동일 표현
          t3.set_upstream(t1)
    
  • DAG 객체 생성 -> Operator를 활용해 Task 작성 -> Task를 연결 하는 방식으로 코드를 작성하면 됩니다!
  • DAGs는 각 Workflow를 뜻하고 Operator는 DAG 내에서 정의되는 작업 함수입니다
  • 쉘에서 airflow scheduler를 입력하면 정의된 DAG들이 실행됩니다!

3) Airflow 기본 명령어

airflow list_dags

  • airflow의 dags 폴더 아래에 *.py 파일을 넣은 후, 위 명령어를 입력하면 DAGs의 리스트를 알 수 있습니다
  • 여기에 나오는 DAGs의 이름은 코드에서 DAG객체를 생성할 떄 넣은 이름이 나타납니다

airflow list_tasks test

  • test라는 이름의 dags안에 있는 task들을 출력합니다

airflow list_tasks test --tree

  • test의 task들을 tree 형태로 보여줍니다

airflow test [DAG id] [Task id] [date] 예시 : airflow test test print_date 2017-10-01

  • DAG의 Task 단위로 test

airflow scheduler

  • test를 모두 완료한 후, 스케쥴러를 실행해줍니다. DAG 코드에 정의된 스케쥴을 실행합니다

airflow -h

  • airflow 관련 help 명령 출력

  • 메인 화면에서 DAG의 이름을 클릭하면 Graph View로 볼 수 있습니다! Tree View를 누르면 아래와 같은 이미지로 변하게 됩니다

  • 빨간색 네모 안에 있는 초록색 칸을 클릭하면 아래와 같은 설정이 나옵니다

  • 각종 설정을 정의할 수 있고, Run, Ignore 등의 액션을 할 수 있습니다

  • Apache Airflow의 간단 사용법에 대해 작성해봤습니다! 다음 글에는 BigQuery Operator, Dataflow Operator에 대한 글을 작성하겠습니다 :)


이 글이 도움이 되셨다면 공감 및 광고 클릭을 부탁드립니다 :)




© 2017. by Seongyun Byeon

Powered by zzsza