Airflow - Custom operator
포스트
취소

Airflow - Custom operator

Airflow는 다양한 Operator를 지원하지만, 필요한 Operator를 직접 만들 수도 있습니다. airflow.models.baseoperator.BaseOperator를 통해 이러한 확장성을 지원합니다.

Custom Operator 생성

BaseOperator에 2가지를 override하는 것으로 Custom Operator를 생성할 수 있습니다.

  • Constructor - 생성할 Operator의 parameter를 정의합니다.
  • Execute - Operator가 실행할 코드를 작성합니다.

Operator 생성

생성할 operator 파일을 만들어 ‘plugins-operators’ 폴더(‘custom-operator’ 등 폴더명을 다르게 하여 operator를 관리할 수 있음)에 생성합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
# hello_operator.py
from airflow.models.baseoperator import BaseOperator


class HelloOperator(BaseOperator):
    def __init__(self, name: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name

    def execute(self, context):
        message = f"Hello {self.name}"
        print(message)
        return message

DAG 생성

다음과 같이 DAG을 만듭니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# hello_dag.py
from datetime import datetime, timedelta

from airflow import DAG
from operators.hello_operator import HelloOperator

with DAG(
    dag_id="example_hello",
    schedule_interval=timedelta(minutes=5),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    dagrun_timeout=timedelta(minutes=60),
    tags=["example"],
) as dag:
    hello_task = HelloOperator(task_id="sample-task", name="foo_bar", dag=dag)

    hello_task

if __name__ == "__main__":
    dag.cli()

MongoDB Operator 생성

위와 같은 방법을 이용해 MongoDB에 데이터를 쌓는 DAG를 만들 수 있습니다.

test collection 생성

test_coll이라는 collection에 Date()함수를 이용해 현재 날짜와 시간을 저장합니다

1
2
3
db.createCollection("test_coll")

db.test_coll.insertOne({date:Date()})

Operator 생성

우선, BaseOperator를 이용해 ‘mongo_operator’ 파일을 ‘plugins - operators’ 폴더에 생성합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from airflow.models import BaseOperator
from pymongo import MongoClient


class MongoCreateDocumentOperator(BaseOperator):
    def __init__(self, host, database, collection, data, *args, **kwargs):
        self.host = host
        self.database = database
        self.collection = collection
        self.data = data
        super().__init__(*args, **kwargs)

    def execute(self, context):
        mongo = MongoClient(host=self.host, port=27017)
        mongo[self.database][self.collection].insert_one(self.data)

DAG 생성

위에서 만든 MongoCreateDocumentOperator를 이용하여 쿼리를 실행하는 DAG을 만듭니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#test_mongo_dag.py
from datetime import datetime, timedelta

from airflow import DAG
from operators.mongo_operator import MongoCreateDocumentOperator

with DAG(
    dag_id="example_mongo",
    schedule_interval=timedelta(minutes=5),
    start_date=datetime(2022, 2, 27),
    catchup=False,
    tags=["example"],
) as dag:
    mongo_task = MongoCreateDocumentOperator(
        task_id="insert_data",
        host="127.0.0.1",
        database="test",
        collection="test_coll",
        data={"date": datetime.today().strftime("%Y/%m/%d %H:%M:%S")},
        dag=dag,
    )
    mongo_task

if __name__ == "__main__":
    dag.cli()

Hooks

Hook을 통해 외부 resource에 연결할 수 있습니다. 또한, Airflow의 connection 정보를 이용하기 때문에 비밀번호와 같은 인증 정보를 파일에 작성하지 않아도 된다는 장점이 있습니다.

먼저, 터미널 창에서 mongo-provider를 설치합니다.

1
pip install apache-airflow-providers-mongo

그리고 Airflow 메뉴 중 ‘admin → connections → mongo_default’에서 아래 정보를 입력합니다.

Connection Type : MongoDB
Host : your host (e.g. localhost)
Schema : your schema (e.g. test)
Port : your port (e.g. 27017)

마지막으로 아래와 같이 Operator와 DAG을 구성하면 위에서 만든 MongoCreateDocumentOperator와 동일하게 현재 날짜와 시간을 document에 저장할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
### mongodb_operator.py
from datetime import datetime

from airflow.models.baseoperator import BaseOperator
from airflow.providers.mongo.hooks.mongo import MongoHook


class MongoDBOperator(BaseOperator):
    def __init__(self, conn_id: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.conn_id = conn_id

    def execute(self, context):
        hook = MongoHook(conn_id=self.conn_id)
        query = {"date": datetime.today().strftime("%Y/%m/%d %H:%M:%S")}
        result = hook.insert_one(
            mongo_collection="test_coll", doc=query, mongo_db="test"
        )
        message = f"result: {result}"
        print(message)
        return message
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
### test2_mongo_dag.py
from datetime import datetime, timedelta

from airflow import DAG
from operators.mongodb_operator import MongoDBOperator

with DAG(
    dag_id="example_mongodb",
    schedule_interval=timedelta(minutes=5),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    dagrun_timeout=timedelta(minutes=60),
    tags=["example"],
) as dag:
    mongo_task = MongoDBOperator(
        task_id="sample-task",
        conn_id="mongo_default",
        dag=dag,
    )

    mongo_task

if __name__ == "__main__":
    dag.cli()
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.