본문 바로가기

TIL

TIL) Airflow Xcom

1. Airflow Xcom

Airflow : airbnb 에서 만든 오픈소스. workflow 를 쉽게 관리할 수 있는 툴이다.

예를 들어, 본인의 코드에 @Scheduled 에너테이션으로 관리하는 코드가 여기저기 흩뿌려져 있다면, airflow 를 통해 쉽게 관리할 수 있다.

또는, 순서가 정해져 있는 여러 Job 들을 주기적으로 실행해야 할 경우에 airflow 를 통해 쉽게 workFlow 를 정의하고, 실행하고, 스케줄링하고, 모니터링할 수 있다.

 

 

airflow 의 아키텍쳐는 아래와 같다.

https://airflow.apache.org/docs/apache-airflow/stable/concepts/overview.html

 

  • Scheduler : Dag 와 Task 들을 관리한다. 등록된 시간에 등록된 Task 들을 실행하도록 스케줄링하고 모니터링한다.
  • WebServer : Airflow 에서 제공하는 관리용 Web Ui 서버.
  • Metadata Database : airflow 를 동작하기 위한 메타데이터를 저장하는 DB. (Dag 정보, Task 정보 등등)
  • Worker : Task 를 실행하는 워커. Executor 의 종류에 따라 Task 를 실행하는 방식이 다르다. (Celery Executor, K8S Executor .. )
  • DAG : 워크플로우

https://www.bucketplace.co.kr/post/2021-04-13-%EB%B2%84%ED%82%B7%ED%94%8C%EB%A0%88%EC%9D%B4%EC%8A%A4-airflow-%EB%8F%84%EC%9E%85%EA%B8%B0/

 

버킷플레이스 Airflow 도입기 - 오늘의집 블로그

탁월한 데이터플랫폼을 위한 Airflow 도입기

www.bucketplace.co.kr

 

 

 

DAG : Directed Acyclic Graph. 단방향 그래프라는 뜻으로, 한쪽방향으로만 흐른다는 뜻이다.

여기서 목적어는 Task 이다.

Task 간 종속성과 관계를 구성하여 실행 방법을 정의한 것이다.

쉽게 말하면 DAG 는 뭐를 할 건지를 정의한 task 들의 flow 집합이다.

 

testDag.py

// 하나의 DAG 정의
myDag = DAG([dag 정의])

// DAG 의 Task 들 정의
task1 = PythonOperator(
	[첫번째 파이썬 테스크 정의],
    dag = myDag
)

task2 = MysqlOperator(
	[첫번째 파이썬 테스크 정의],
    dag = myDag
)

// 테스크의 순서 정의
task1 >> task2

 

 

 

 

Xcom : 하나의 Dag 에는 여러 task 가 있을 수 있다.

그런데 하나의 task 에서의 결과값을 다음 task 에서 사용하고 싶을 수 있다.

이때 airflow 에서는 Xcom 이라는 것을 사용해서 task 간에 메세지를 주고받을 수 있다.

 

Xcom 은 Dag 안에서 여러 Task 가 공유하는 { key : value } 형태의 변수라고 생가하면 된다.

전 Task 에서 값을 Push 하고, 다음 테스크에서 pull 해서 사용한다.

pythonOperator Task 의 경우 return 값이 자동으로 push 된다.

 

 

 

xcom 을 사용한 DAG 예제

dag = DAG(
    [dag 정의]
)

// 파이썬 코드에서 리턴으로 push
def return_push():
    return "value!@#"
    
// key, value 값 푸시
def xcom_push(**kwargs):
    key = "this_is_key"
    value = "this_is_value"
    kwargs['task_instance'].xcom_push(key=key, value=value)
    // kwargs['ti'] 로 써도 된다.
    
    return "value%^&"
    
    
// pull 의 2가지 방법
def xcom_pull(**kwargs):
    val1 = kwargs['ti'].xcom_pull(task_ids='task_no_1') // taskId 로 pull
    val2 = kwargs['ti'].xcom_pull(key='this_is_value') // key 값으로 pull
    val2 = kwargs['ti'].xcom_pull(task_ids='task_no_2') taskId 로 pull
    
// 한 DAG 안에서 이전 task1, 2 에서 저장된 xcom 은 task3 에서 모두 사용 가능

    
task1 = PythonOperator(
    task_id = 'task_no_1',
    python_callable = return_push,
    dag = dag
)

task2 = PythonOperator(
    task_id = 'task_no_2',
    python_callable = xcom_push,
    dag = dag
)

task3 = PythonOperator(
    task_id = 'task_no_3',
    python_callable = xcom_pull,
    dag = dag
)

task1 >> task2 >> task3

 

 

[주의점]

  • 큰 용량의 교환은 불가능. 필요하다면 해당 Task 에서 DB 에 저장후 다른 Task 에서 조회해서 쓰자.
  • Web UI 에서도 확인 가능하다. -> MetaDB 에 저장된다. 민감한 정보를 Xcom 으로 사용해서 괜히 노출시키지 말자.

 

 

 

 

 

[카카오 if airflow 발표]

https://if.kakao.com/session/55

 

if(kakao)2021

함께 나아가는 더 나은 세상

if.kakao.com

 

[xcom 참조]

https://dydwnsekd.tistory.com/107

 

Airflow Xcom 사용하기

이번 글에서는 Airflow Xcom에 대해서 알아보도록 하자. Airflow Version : 2.1.3에서 테스트를 진행했다. Xcom이란 Xcom은 DAG 내의 task 사이에서 데이터를 전달하기 위해서 사용되는데, CeleryExecutor를 예로..

dydwnsekd.tistory.com

 

'TIL' 카테고리의 다른 글

TIL) Kotlin Test  (0) 2022.02.26
TIL) EPSILON, 몽고 업데이트시 다른 컬럼 값 참조, 몽고 덤프  (0) 2022.02.24
TIL) 인덱스, DFA  (0) 2022.02.23
TIL) Webserver vs WAS, NGNIX vs Apache  (0) 2022.02.22
TIL) associate 시리즈  (0) 2022.02.19