Airflow - MySQL operator
포스트
취소

Airflow - MySQL operator

Apache Airflow는 워크플로우 스케쥴을 작성하고, 모니터링하기 위한 오픈 소스 워크플로우 관리 플랫폼입니다. 파이썬을 이용하여 보다 쉽고, 복잡한 파이프라인을 구성할 수 있게 합니다. Airflow를 이용해 MySQL에 데이터를 쌓는 작업을 자동화할 수 있습니다.


0. test DB 생성

test_airflow DB를 생성한 후, 현재 날짜와 시간을 업데이트 해주는 간단한 쿼리를 작성합니다.

1
2
3
4
5
6
7
8
# test_airflow.sql

-- CREATE TABLE `test_db`.`test_airflow` (
--   `_id` INT NOT NULL AUTO_INCREMENT,
--   `updated_at` TIMESTAMP(16) NOT NULL,
--   PRIMARY KEY (`_id`));

INSERT INTO test_db.test_airflow VALUES (NULL,now());

1. MySQL Connection 이용

Mysql provider 패키지 설치

터미널 창에서 mysql-provider를 설치합니다.

1
pip install apache-airflow-providers-mysql

Airflow Connections

Airflow 메뉴 중 ‘admin → connections → mysql_default’에서 아래 정보를 입력합니다.

Connection Type : MySQL
Host : your host (e.g. localhost)
Login : your username (e.g. root)
Password : your password
Port : your port (e.g. 3306)

DAG 생성

Airflow provider에서 제공하는 MySqlOperator를 이용해 DAG를 생성하는 코드는 아래와 같습니다. MySqlOperator는 ‘쿼리문을 직접 작성’하거나, ‘sql 파일을 통해서’ 원하는 작업을 진행할 수 있습니다. 다만, 주의할 점은 sql 파일이 있는 ‘scripts’ 폴더는 DAG 파일과 동일한 경로에 있어야 하며, ‘commit’이 필요하다면 autocommit 옵션을 이용합니다.

다음과 같이 DAG을 만듭니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# test_mysql_dag.py

from datetime import datetime, timedelta

from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator

with DAG(
    "example_mysql",
    start_date=datetime(2022, 2, 21),
    default_args={"mysql_conn_id": "mysql_default"},
    schedule_interval=timedelta(minutes=1),
    tags=["example"],
    catchup=False,
) as dag:
    mysql_task = MySqlOperator(
        task_id="insert_value",
        autocommit=True,
        sql="/scripts/test_airflow.sql",
        # sql=r"""INSERT INTO test_db.test_airflow VALUES (NULL,now());""",
        dag=dag,
    )

    mysql_task

if __name__ == "__main__":
    dag.cli()

2. BaseOperator 이용

Airflow의 BaseOperator와 pymysql 라이브러리를 이용해서도 MySQL과 동작하는 Operator를 만들 수 있습니다.

Operator 생성

우선, BaseOperator를 이용해 mysql_operator 파일을 ‘plugins - operators’ 폴더에 생성합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# mysql_operator.py

from typing import Optional
from airflow.models import BaseOperator
import pymysql


class MySqlOperator(BaseOperator):
    def __init__(
        self,
        host,
        user,
        password,
        database,
        needCommit,
        query: Optional[str] = None,
        sqlfile: Optional[bool] = False,
        filepath: Optional[str] = None,
        *args,
        **kwargs
    ):
        self.host = host
        self.user = user
        self.password = password
        self.database = database
        self.query = query
        self.needCommit = needCommit
        self.sqlfile = sqlfile
        self.filepath = filepath
        super().__init__(*args, **kwargs)

    def execute(self, context):
        conn = pymysql.connect(
            user=self.user,
            passwd=self.password,
            host=self.host,
            db=self.database,
            charset="utf8",
        )
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        if self.sqlfile:
            query = open(self.filepath).read()
            print(query)
        else:
            query = self.query
        cursor.execute(query)
        if self.needCommit:
            conn.commit()
        conn.close()

DAG 생성

위에서 만든 MySqlOperator를 이용하여 쿼리를 실행하는 DAG를 만듭니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# test2_mysql_dag.py

from datetime import datetime, timedelta

from airflow import DAG
from operators.mysql_operator import MySqlOperator

with DAG(
    dag_id="example_mysql2",
    start_date=datetime(2022, 2, 21),
    schedule_interval=timedelta(minutes=1),
    tags=["example"],
    catchup=False,
) as dag:
    mysql_task = MySqlOperator(
        task_id="insert_value",
        host="localhost",
        user="root",
        password="your password!",
        database="test_db",
        # query=r"""INSERT INTO test_db.test_airflow VALUES (NULL,now());""",
        needCommit=True,
        sqlfile=True,
        filepath="airflow/dags/scripts/test_airflow.sql",
        dag=dag,
    )

    mysql_task

if __name__ == "__main__":
    dag.cli()
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.