pbj0812의 코딩 일기

[자동화] Airflow BashOperator 사용하기 본문

빅데이터/자동화

[자동화] Airflow BashOperator 사용하기

pbj0812 2021. 1. 1. 04:40

0. 목표

 - Airflow BashOperator 사용하기

1. flow chart

 - python 으로 빈 csv 파일을 만들고 그 python 파일을 bash 가 돌리고 그 bash 파일을 Airflow 의 BashOperator 를 이용하여 구동

2. python 파일 생성(make_csv.py)

 - 해당 위치에 test.csv 라는 빈 csv 파일 생성

def result():
  f = open("/Users/pbj0812/Desktop/test_code/test_airflow/test.csv", "w")
  f.close()

if __name__ == "__main__":
  result()

3. bash 파일 생성(make_csv.sh)

 - 2. 의 make_csv.py 를 돌리도록 생성

python /Users/pbj0812/Desktop/test_code/test_airflow/make_csv.py

4. dag 파일 생성

 1) airflow 메인 화면에 들어가서 예제 파일을 하나 클릭

 2) Details 를 들어가 어디에 파일이 저장되어 있는지 확인하고 해당 위치에 작업 생성

 3) 해당 위치에 dags 폴더를 만들고 파일 생성(example_bash_operator_test.py)

  - 소유자는 admin

  - 1분씩 돌도록 설정

  - templated_command 변수를 생성하여 해당 bash 명령어를 입력

  - BashOperator 에서 task_id 를 생성하고 작업 내용은 위에서 만든 templated_command 로 사용

from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'admin',
}

dag = DAG(
    dag_id='example_bash_operator_test',
    default_args=args,
    start_date=days_ago(2),
    dagrun_timeout=timedelta(minutes=1),
    schedule_interval='* * * * *',
)

templated_command = """
sh /Users/pbj0812/Desktop/test_code/test_airflow/make_csv.sh
"""

# [START howto_operator_bash]
run_this = BashOperator(
    task_id='bash_test',
    bash_command=templated_command,
    dag=dag,
)
# [END howto_operator_bash]

if __name__ == "__main__":
    dag.cli()

 4) airflow 메인화면에서 해당 dag 를 열어줌

 5) 생성된 csv 확인

 6) 실행시간 확인

 7) log 확인

Comments