Apache Airflow는 워크플로우 스케쥴을 작성하고, 모니터링하기 위한 오픈 소스 워크플로우 관리 플랫폼입니다. 파이썬을 이용하여 보다 쉽고, 복잡한 파이프라인을 구성할 수 있게 합니다. Airflow를 이용해 MySQL에 데이터를 쌓는 작업을 자동화할 수 있습니다.
0. test DB 생성
test_airflow
DB를 생성한 후, 현재 날짜와 시간을 업데이트 해주는 간단한 쿼리를 작성합니다.
1
2
3
4
5
6
7
8
# test_airflow.sql
-- CREATE TABLE `test_db`.`test_airflow` (
-- `_id` INT NOT NULL AUTO_INCREMENT,
-- `updated_at` TIMESTAMP(16) NOT NULL,
-- PRIMARY KEY (`_id`));
INSERT INTO test_db.test_airflow VALUES (NULL,now());
1. MySQL Connection 이용
Mysql provider 패키지 설치
터미널 창에서 mysql-provider를 설치합니다.
1
pip install apache-airflow-providers-mysql
Airflow Connections
Airflow 메뉴 중 ‘admin → connections → mysql_default’에서 아래 정보를 입력합니다.
Connection Type : MySQL
Host : your host (e.g. localhost)
Login : your username (e.g. root)
Password : your password
Port : your port (e.g. 3306)
DAG 생성
Airflow provider에서 제공하는 MySqlOperator
를 이용해 DAG를 생성하는 코드는 아래와 같습니다. MySqlOperator
는 ‘쿼리문을 직접 작성’하거나, ‘sql 파일을 통해서’ 원하는 작업을 진행할 수 있습니다. 다만, 주의할 점은 sql 파일이 있는 ‘scripts’ 폴더는 DAG 파일과 동일한 경로에 있어야 하며, ‘commit’이 필요하다면 autocommit 옵션을 이용합니다.
다음과 같이 DAG을 만듭니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# test_mysql_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
with DAG(
"example_mysql",
start_date=datetime(2022, 2, 21),
default_args={"mysql_conn_id": "mysql_default"},
schedule_interval=timedelta(minutes=1),
tags=["example"],
catchup=False,
) as dag:
mysql_task = MySqlOperator(
task_id="insert_value",
autocommit=True,
sql="/scripts/test_airflow.sql",
# sql=r"""INSERT INTO test_db.test_airflow VALUES (NULL,now());""",
dag=dag,
)
mysql_task
if __name__ == "__main__":
dag.cli()
2. BaseOperator 이용
Airflow의 BaseOperator
와 pymysql 라이브러리를 이용해서도 MySQL과 동작하는 Operator를 만들 수 있습니다.
Operator 생성
우선, BaseOperator
를 이용해 mysql_operator 파일을 ‘plugins - operators’ 폴더에 생성합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# mysql_operator.py
from typing import Optional
from airflow.models import BaseOperator
import pymysql
class MySqlOperator(BaseOperator):
def __init__(
self,
host,
user,
password,
database,
needCommit,
query: Optional[str] = None,
sqlfile: Optional[bool] = False,
filepath: Optional[str] = None,
*args,
**kwargs
):
self.host = host
self.user = user
self.password = password
self.database = database
self.query = query
self.needCommit = needCommit
self.sqlfile = sqlfile
self.filepath = filepath
super().__init__(*args, **kwargs)
def execute(self, context):
conn = pymysql.connect(
user=self.user,
passwd=self.password,
host=self.host,
db=self.database,
charset="utf8",
)
cursor = conn.cursor(pymysql.cursors.DictCursor)
if self.sqlfile:
query = open(self.filepath).read()
print(query)
else:
query = self.query
cursor.execute(query)
if self.needCommit:
conn.commit()
conn.close()
DAG 생성
위에서 만든 MySqlOperator
를 이용하여 쿼리를 실행하는 DAG를 만듭니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# test2_mysql_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from operators.mysql_operator import MySqlOperator
with DAG(
dag_id="example_mysql2",
start_date=datetime(2022, 2, 21),
schedule_interval=timedelta(minutes=1),
tags=["example"],
catchup=False,
) as dag:
mysql_task = MySqlOperator(
task_id="insert_value",
host="localhost",
user="root",
password="your password!",
database="test_db",
# query=r"""INSERT INTO test_db.test_airflow VALUES (NULL,now());""",
needCommit=True,
sqlfile=True,
filepath="airflow/dags/scripts/test_airflow.sql",
dag=dag,
)
mysql_task
if __name__ == "__main__":
dag.cli()