카일 스쿨 6회차

  • Hits
  • #0. 영양제
  • #1. Airflow

0. 영양제

  • 영양제/비타민을 왜 섭취해야 하는가
    • 최고의 건강 관리 : 균형잡힌 식사와 꾸준한 운동
    • 하지만... 꾸준한 운동..
    • 과로, 스트레스에 시달리는 경우 영양소 소모가 많음 => 부족한 부분 발생
    • 자취생 => 과일 못먹음 + 배달 => 균형 잡히지 못한 식사
    • 비타민 D => 햇빛을 봐야하는데 => 우린 일을 하네
  • 어떤 비타민을 먹어야 하는가
  • 비타민B
    • 육체 피로 회복
    • 학업, 취업, 업무 등으로 육체피로가 만성화 => 마그네슘과 같이 섞기
    • 저는 비맥스 메타를 먹고 있는데 효과 좋음
  • 마그네슘
    • 세포 에너지 생성에 관여
    • 마그네슘 부족시 피로, 근육통, 경련, 수면장애, 우울감 등
  • 루테인
    • 시세포가 밀집된 황반의 기능을 유지
  • 비타민C
    • 항산화 효능, 면역령 강화
  • 오메가3
    • 안구건조증에 효과적, 당뇨에 도움, 치매 방지 등
  • 다 먹으라는 것은 아니고 하나씩 찾아보고, 자신에게 맞는 것을 섭취!
  • 비타민을 어떻게 구입할 것인가
    • 아이허브 VS 로켓 쿠팡직구
      • 아이허브가 무조건 저렴한 것은 아니고, 로켓 쿠팡직구가 저렴한 경우도 있음
      • 아이허브는 40달러 이상 구매하면 무료 배송
  • 사실 제일 중요한 것은
    • 꾸준한 운동
    • 좋은 식습관
    • 영양제
    • 3위일체..!
  • 추천 자료
    • 약사가 들려주는 약이야기(고약사님) 유튜브
    • 약사가 들려주는 약이야기(고약사님) 인스타
    • 쿠마님 블로그

1. Airflow

  • 오늘 할 이야기
    • Airflow란?
    • Airflow Architecture
    • DAG
    • Airflow BashOperator, PythonOperator 사용하기
    • Jinja Template 사용하기
    • Airflow로 토이 ETL 파이프라인 만들기
  • 사실 더 하고싶지만.. 1시간은 생각보다 짧기 때문에 이정도만 :)
  • Apache Airflow란?
    • 에어비앤비에서 만든 Workflow Management Tool
      • Workflow : 일련의 Task들의 연결
    • 활용할 수 있는 포인트
      • 데이터 엔지니어링 : ETL 파이프라인
        • 데이터를 source에서 가져와서 데이터 마트, 데이터 웨어하우스 등에 저장
      • 머신러닝 엔지니어링
        • 머신러닝 모델 주기적인 학습(1주 간격), 예측(30분 간격)
        • 실시간 API가 아닌 Batch성 예측
      • 간단한 cron 작업
        • crontab에 특정 작업 반복 등을 실행
      • 여러 작업들의 연결성(의존성) 관리
        • 앞의 작업이 성공해야 뒤 작업을 하도록 설정
      • 여러가지 작업을 효율적으로 관리(시각화 등)
  • Apache Airflow의 장점
    • Python 기반
    • Scheduling : 특정 간격으로 계속 실행
    • Backfill : 과거 작업 실행
    • 특정 Task 실패시 => Task만 재실행 / DAG 재실행 등 실패 로직도 있음
    • 데이터 엔지니어링에서 많이 사용됨
    • Google Cloud Platform에 있는 대부분의 기능을 지원
    • Google Cloud Platform엔 Managed Service(관리형 서비스)인 Composer 존재
  • Airflow UI 설명
  • 각종 Task 연결(Graph View)
    • 빨간색 : failed => 앞에 작업들이 실패해서 뒤 작업 run_after_loop이 노란색 upstream_failed 되고 실행되지 않음
  • UTC
    • 협정 세계시로 1972년 1월 1일부터 시행된 국제 표준시
    • 서버에서 시간 처리할 땐, 거의 UTC를 사용함
    • 한국 시간은 UTC+9hour
    • Airflow에서 UTC를 사용하기 때문에, CRON 표시할 때 UTC 기준으로 작성
      • 예 : UTC 30 1 * * * => 한국은 30 10 * * * => 한국 오전 10시 30분
  • Airflow 실행

    • airflow webserver와 airflow scheduler 2개 실행해야 함
    • 터미널 1개에 webserver를 띄우고, command+t로 새로운 터미널을 띄워서 scheduler를 띄우기

      airflow webserver
      airflow scheduler
  • Airflow 실행해보기

    • tutorial DAG을 실행(Links 아래에 있는 재생 버튼 클릭)
    • 혹시 ValueError: unknown locale: UTF-8 에러가 날경우 ~/.zshrc 또는 ~/.bash_profile에 아래 설정 추가

        export LC_ALL=en_US.UTF-8
        export LANG=en_US.UTF-8
    • 그 후 터미널에서 아래 커맨드 실행하고 webserver 다시 실행

        source ~/.zshrc
        # 또는 source ~/.bash_profile
  • Airflow Architecture
    • Airflow Webserver
      • 웹 UI를 표현하고, workflow 상태 표시하고 실행, 재시작, 수동 조작, 로그 확인 등 가능
    • Airflow Scheduler
      • 작업 기준이 충족되는지 여부를 확인
      • 종속 작업이 성공적으로 완료되었고, 예약 간격이 주어지면 실행할 수 있는 작업인지, 실행 조건이 충족되는지 등
      • 위 충족 여부가 DB에 기록되면, task들이 worker에게 선택되서 작업을 실행함
  • DAG
    • Airflow의 DAG으로 모델링됨
    • Directed Acyclic Graphs
    • 방향이 있는 비순환 그래프
    • 비순환이기 때문에 마지막 Task가 다시 처음 Task로 이어지지 않음
  • 코드로 보는 DAG
  • 1) Default Argument 정의

    • start_date가 중요! 과거 날짜를 설정하면 그 날부터 실행
    • retries, retry_delay : 실패할 경우 몇분 뒤에 재실행할지?
    • priority_weight : 우선 순위
    • 외에도 다양한 옵션이 있는데, 문서 참고

      default_args = {
        'owner': 'your_name',
        'depends_on_past': False,
        'start_date': datetime(2018, 12, 1),
        'email': ['your@mail.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        'priority_weight': 10,
        'end_date': datetime(2018, 12, 3),
        # end_date가 없으면 계속 진행함
      }
  • 2) DAG 객체 생성

    • 첫 인자는 dag_id인데 고유한 id 작성
    • default_args는 위에서 정의한 argument를 넣고
    • schedule_interval은 crontab 표현 사용

      • schedule_interval='@once'는 한번만 실행. 디버깅용으로 자주 사용
      • 5 4 * * * 같은 표현을 사용
      • 더 궁금하면 crontab guru 참고
      dag = DAG('bash_dag', default_args=default_args, schedule_interval='@once'))
  • 3) Operator로 Task 정의

    • Operator가 Instance가 되면 Task라 부름
    • BashOperator : Bash Command 실행
    • PythonOperator : Python 함수 실행
    • BigQueryOperator : BigQuery 쿼리 날린 후 Table 저장
    • 외에도 다양한 operator가 있고, operator마다 옵션이 다름
    • Airflow Document, Integration Operator 참고
    • mysql_to_hive 등도 있음

      task1 = BashOperator(
        task_id='print_date',
        bash_command='date',
        dag=dag)
      
      task2 = BashOperator(
        task_id='sleep',
        bash_command='sleep 5',
        retries=2,
        dag=dag)
      
      task3 = BashOperator(
        task_id='pwd',
        bash_command='pwd',
        dag=dag)
  • 4) task 의존 설정

    • task1 후에 task2를 실행하고 싶다면
      • task1.set_downstream(task2)
      • task2.set_upstream(task1)
    • 더 편해지면서 >><< 사용 가능
    • task1 >> task2로 사용 가능
    • task1 >> [task2, task3]는 task1 후에 task2, task3 병렬 실행을 의미

      task1 >> task2
      task1 >> task3
  • 5) DAG 파일을 DAG 폴더에 저장해 실행되는지 확인
    • DAG 폴더에 넣고 바로 Webserver에 반영되진 않고 약간의 시간이 필요함
    • 수정하고 싶으면 ~/airflow/airflow.cfg에서 dagbag_import_timeout, dag_file_processor_timeout 값을 수정하면 됨
  • 6) 디버깅
    • DAG이 실행되는지 확인 => 실행이 안된다면 DAG의 start_date를 확인
    • 실행되서 초록색 불이 들어오길 기도
    • 만약 초록이 아닌 빨간불이면 Task를 클릭해서 View log 클릭
  • Airflow BashOperator 사용하기
    • 01-bash_operator.py 참고
    • 앞에서 예제로 보여준 BashOperator 내용을 타이핑해보기 (5분)
      • default_argument에서 start_date는 datetime(2019, 2, 13)
      • DAG의 schedule_interval은 0 10 * * * 입력
    • 파일명은 airflow_test.py
    • (따로 설정 안했다면) ~/airflow/dags에 저장하면 됨
      • dags 폴더가 없다면 생성
    • dags에 airflow_test.py 저장
    • 지금은 간단한 bash command를 사용했지만, bash로 파이썬 파일도 실행할 수 있으니 활용 포인트가 무궁무진함
    • 재실행하고 싶으면 Task 클릭 후 Clear 클릭
  • PythonOperator
    • 02-python_operator.py 참고
    • current_date를 받아서 한글로 요일 출력하는 함수 작성하기 (3분)
  • PythonOperator(task_id, python_callable, op_args, dag, provide_context, templates_dict)로 사용함
    • task_id는 task의 id(예 : print_current_date)
    • python_callable는 호출 수 있는 python 함수를 인자로 넣음
    • op_args : callable 함수가 호출될 때 사용할 함수의 인자
    • dag : DAG 정의한 객체 넣으면 됨
    • provide_context : True로 지정하면 Airflow에서 기본적으로 사용되는 keyword arguments 등이 사용 가능하게 됨
    • templates_dict : op_args 등과 비슷하지만 jinja template이 변환됨
  • 방금 작성한 PythonOperator의 아쉬운 점
    • 돌아간 Task의 로그를 보면 => 모두 같은 결과가 나옴
      • 언제 실행해도 무조건 datetime.now()를 사용해서 현재 날짜를 사용함
    • 어제 일자에서 이 함수를 실행했다면?
      • 2020-02-13는 목요일입니다가 출력되었을 것
    • 이런 경우 Python Code에서 시간에 대한 컨트롤을 가진 케이스
    • Python Code에서 컨트롤을 가지면 과거 작업을 돌리기 힘듬
    • Airflow에서 Date를 컨트롤하는게 좋음
    • 이럴 때 Airflow에서 제공되는 기본 context 변수 또는 Jinja Template 사용
      • Flask에서 Jinja Template을 사용함
  • Airflow의 기본 context 변수 사용하기

    • 03-python_operator_with_context.py 참고
    • PythonOperator에서 provide_context=True일 경우 사용 가능
    • kwargs에 값이 저장됨
    • 예를 들면

        provide_context=True로 지정하면 kwargs 다양한 값들이 저장됨
        {'dag': <DAG: python_dag_with_jinja>,
        'ds': '2020-02-10',
        'next_ds': '2020-02-11',
        'next_ds_nodash': '20200211',
        'prev_ds': '2020-02-09',
        'prev_ds_nodash': '20200209',
        'ds_nodash': '20200210',
        'ts': '2020-02-10T00:30:00+00:00',
        'ts_nodash': '20200210T003000',
        'ts_nodash_with_tz': '20200210T003000+0000',
        'yesterday_ds': '2020-02-09',
        'yesterday_ds_nodash': '20200209',
        'tomorrow_ds': '2020-02-11',
        'tomorrow_ds_nodash': '20200211',
        'end_date': '2020-02-10',
        'execution_date': <Pendulum [2020-02-10T00:30:00+00:00]> ...}
  • Jinja Template 사용하기
    • 04-python_operator_with_jinja.py 참고
    • "{{ ds }}" 이런 형태로 사용함 : execution_date
    • PythonOperator는 기본 context 변수 사용이 더 쉽지만, 다른 Operator는 Jinja Template이 편함
    • PythonOperator는 templates_dict에 변수를 넣어서 사용
    • Macros Default Variables Document에 정의되어 있음
  • Backfill
    • Context Variable이나 Jinja Template을 사용하면 Backfill을 제대로 사용할 수 있음
    • Backfill : 과거 날짜 기준으로 실행
    • airflow backfill -s START_DATE -e END_DATE dag_id
    • 아래 명령어를 입력해보고 Webserver에 가봅시다
airflow backfill -s 2020-01-05 -e 2020-01-10 python_dag_with_jinja
  • Airflow로 토이 ETL 파이프라인 만들기
    • 시나리오
      • Google Cloud Storage에 매일 하루에 1번씩 주기적으로 csv 파일이 저장됨
      • csv 파일을 BigQuery에 Load
      • BigQuery에서 쿼리를 돌린 후, 일자별로 사용량 쿼리해서 Table 저장
  • 설정 확인
    • APIs & Services - Create Credentials - Service account
    • Service account permissions에서 BigQuery Admin, Storage Admin
    • Create key (optional) 밑에 있는 CREATE KEY 클릭
      • JSON 선택하고 CREATE
    • 다운로드된 project_name-123123.json 확인
      • 이 Key는 매우 중요하니 꼭 잘 보관!!!(유출시 피해가 큼. 잘 모르면 그냥 삭제 추천)
  • Airflow Webserver - Admin - connection 이동
  • Google Cloud Storage에 데이터 업로드
    • Google Cloud Storage로 이동
    • CREATE BUCKET(이미 있다면 그거 사용해도 무방) 클릭
      • 지역은 그냥 Region, us-east1하고 나머지 그냥 다 Continue 클릭
      • 전 kyle-school bucket 만듬
    • https://github.com/zzsza/kyle-school/tree/master/week6/data 데이터 다운!
      • bike_data_20200209 ~ bike_data_20200212.csv
    • 방금 만든 Bucket의 data 폴더 안에 방금 받은 파일 업로드
  • GoogleCloudStorageToBigQueryOperator, BigQueryOperator 사용할 예정
  • GoogleCloudStorageToBigQueryOperator
    • 05-simple_etl.py 참고
    • file명은 일정한 특징이 있음. bikedata{date}.csv
    • schema_object : 스키마가 어떤 이름을 갖고, 어떤 타입인지 정의해둔 json
    • bucket : 우리가 만든 bucket
    • source_objects = Source Data
    • destination_project_dataset_table : 저장할 경로
    • 지금 bikedata{date} Table 형태로 저장하는데, 이 형태는 샤딩으로 저장한 Table
  • BigQueryOperator
    • 간단히 생각하면 쿼리를 날려서 Table에 저장
    • agg_query에 간단한 쿼리 작성함
    • BigQueryOperator는 destination_dataset_table만 잘 정의하면 됨
  • Webserver에서 이런 오류가 발생한다면

    • No module named 'googleapiclient'

      pip3 install --upgrade google-api-python-client
    • No module named 'airflow.gcp'

      pip3 install 'apache-airflow[gcp]'==1.10.3
    • 아마 웹서버쪽에서 werkzeug 다시 설치해야할 수 있음

      pip3 install werkzeug==0.15.1
  • Airflow Local에서 실행할 경우
    • SequentialExecutor : 동시에 1개만 처리 가능
      • DB : sqlite
    • 병렬로 돌리기 위해 postegre, redis 등을 붙임
    • 설치가 매우 복잡하고 까다롭기 때문에 Docker 등을 활용하면 좋음
    • 보통 회사엔 공용 Airflow가 띄워져 있고, 그걸 활용함
    • 바닥부터 해봤으니, Airflow 단순히 사용하는 것은 꽤 익숙할 것이라 생각
    • Jupyter Notebook으로 DAG 폴더를 연결하면, 친숙하게 사용할 수 있음
  • Already running on PID XXXX Error가 발생할 경우

    • Webserver가 제대로 종료되지 않은 상황

      kill -9 $(lsof -t -i:8080)

다음 카일 스쿨

  • 각종 개발 도구들 & MLOps 개론
    • 설문을 해주시면, 커리큘럼 개선에 도움이 됩니다
    • 지금까지 받은 설문은 다 받고 고민하고 있습니다
  • 참고 : 3월 13일은 데이터 그룹 워크샵으로 카일 스쿨이 없습니다