1. Airflow Xcom
Airflow : airbnb 에서 만든 오픈소스. workflow 를 쉽게 관리할 수 있는 툴이다.
예를 들어, 본인의 코드에 @Scheduled 에너테이션으로 관리하는 코드가 여기저기 흩뿌려져 있다면, airflow 를 통해 쉽게 관리할 수 있다.
또는, 순서가 정해져 있는 여러 Job 들을 주기적으로 실행해야 할 경우에 airflow 를 통해 쉽게 workFlow 를 정의하고, 실행하고, 스케줄링하고, 모니터링할 수 있다.
airflow 의 아키텍쳐는 아래와 같다.
- Scheduler : Dag 와 Task 들을 관리한다. 등록된 시간에 등록된 Task 들을 실행하도록 스케줄링하고 모니터링한다.
- WebServer : Airflow 에서 제공하는 관리용 Web Ui 서버.
- Metadata Database : airflow 를 동작하기 위한 메타데이터를 저장하는 DB. (Dag 정보, Task 정보 등등)
- Worker : Task 를 실행하는 워커. Executor 의 종류에 따라 Task 를 실행하는 방식이 다르다. (Celery Executor, K8S Executor .. )
- DAG : 워크플로우
DAG : Directed Acyclic Graph. 단방향 그래프라는 뜻으로, 한쪽방향으로만 흐른다는 뜻이다.
여기서 목적어는 Task 이다.
Task 간 종속성과 관계를 구성하여 실행 방법을 정의한 것이다.
쉽게 말하면 DAG 는 뭐를 할 건지를 정의한 task 들의 flow 집합이다.
testDag.py
// 하나의 DAG 정의
myDag = DAG([dag 정의])
// DAG 의 Task 들 정의
task1 = PythonOperator(
[첫번째 파이썬 테스크 정의],
dag = myDag
)
task2 = MysqlOperator(
[첫번째 파이썬 테스크 정의],
dag = myDag
)
// 테스크의 순서 정의
task1 >> task2
Xcom : 하나의 Dag 에는 여러 task 가 있을 수 있다.
그런데 하나의 task 에서의 결과값을 다음 task 에서 사용하고 싶을 수 있다.
이때 airflow 에서는 Xcom 이라는 것을 사용해서 task 간에 메세지를 주고받을 수 있다.
Xcom 은 Dag 안에서 여러 Task 가 공유하는 { key : value } 형태의 변수라고 생가하면 된다.
전 Task 에서 값을 Push 하고, 다음 테스크에서 pull 해서 사용한다.
pythonOperator Task 의 경우 return 값이 자동으로 push 된다.
xcom 을 사용한 DAG 예제
dag = DAG(
[dag 정의]
)
// 파이썬 코드에서 리턴으로 push
def return_push():
return "value!@#"
// key, value 값 푸시
def xcom_push(**kwargs):
key = "this_is_key"
value = "this_is_value"
kwargs['task_instance'].xcom_push(key=key, value=value)
// kwargs['ti'] 로 써도 된다.
return "value%^&"
// pull 의 2가지 방법
def xcom_pull(**kwargs):
val1 = kwargs['ti'].xcom_pull(task_ids='task_no_1') // taskId 로 pull
val2 = kwargs['ti'].xcom_pull(key='this_is_value') // key 값으로 pull
val2 = kwargs['ti'].xcom_pull(task_ids='task_no_2') taskId 로 pull
// 한 DAG 안에서 이전 task1, 2 에서 저장된 xcom 은 task3 에서 모두 사용 가능
task1 = PythonOperator(
task_id = 'task_no_1',
python_callable = return_push,
dag = dag
)
task2 = PythonOperator(
task_id = 'task_no_2',
python_callable = xcom_push,
dag = dag
)
task3 = PythonOperator(
task_id = 'task_no_3',
python_callable = xcom_pull,
dag = dag
)
task1 >> task2 >> task3
[주의점]
- 큰 용량의 교환은 불가능. 필요하다면 해당 Task 에서 DB 에 저장후 다른 Task 에서 조회해서 쓰자.
- Web UI 에서도 확인 가능하다. -> MetaDB 에 저장된다. 민감한 정보를 Xcom 으로 사용해서 괜히 노출시키지 말자.
[카카오 if airflow 발표]
https://if.kakao.com/session/55
[xcom 참조]
https://dydwnsekd.tistory.com/107
'TIL' 카테고리의 다른 글
TIL) Kotlin Test (0) | 2022.02.26 |
---|---|
TIL) EPSILON, 몽고 업데이트시 다른 컬럼 값 참조, 몽고 덤프 (0) | 2022.02.24 |
TIL) 인덱스, DFA (0) | 2022.02.23 |
TIL) Webserver vs WAS, NGNIX vs Apache (0) | 2022.02.22 |
TIL) associate 시리즈 (0) | 2022.02.19 |