Airflow - Slack 알림 받기
포스트
취소

Airflow - Slack 알림 받기

Airflow에서 task의 성공 여부에 따라 slack으로 알림을 받는 방법을 알아보겠습니다.

Operator

kobis_operator

전체 코드는 아래와 같습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# plugins/operators/kobis_operator.py
import sys
import warnings
import logging

warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
import requests
from datetime import datetime, timedelta
import pymysql

from airflow.models import BaseOperator


class KobisOperator(BaseOperator):
    def __init__(self, host, user, password, database, needCommit, *args, **kwargs):
        self.host = host
        self.user = user
        self.password = password
        self.database = database
        self.needCommit = needCommit
        super().__init__(*args, **kwargs)

    def execute(self, context):
        conn = pymysql.connect(
            user=self.user,
            passwd=self.password,
            host=self.host,
            db=self.database,
            charset="utf8",
        )

        # Target 날짜 설정
        date = (context.get("execution_date") - timedelta(days=1)).strftime("%Y%m%d")

        # API 호출
        rdict = {"key": "Your API Key", "targetDt": date}

        url = "http://www.kobis.or.kr/kobisopenapi/webservice/rest/boxoffice/searchDailyBoxOfficeList.json"
        r = requests.get(
            url,
            params=rdict,
        )
        tmp = r.json()

        # MySQL DB 연결
        try:
            conn = pymysql.connect(
                user=self.user,
                passwd=self.password,
                host=self.host,
                db=self.database,
                charset="utf8",
            )
            cursor = conn.cursor(pymysql.cursors.DictCursor)
            print("Successfully connected.")
        except:
            logging.error("Could not connected")
            sys.exit(1)

        # Data 수집
        boxoffice_json = tmp["boxOfficeResult"]["dailyBoxOfficeList"]

        boxoffice_dict = {}
        for i in range(len(boxoffice_json)):

            boxoffice_dict.update(
                {
                    "id": "a" + boxoffice_json[i]["movieCd"] + date[-2:],
                    "_name": boxoffice_json[i]["movieNm"].replace("\t", ""),
                    "target_date": datetime.strptime(date, "%Y%m%d").strftime(
                        "%Y-%m-%d"
                    ),
                    "open_date": np.where(
                        boxoffice_json[i]["openDt"] == " ",  # 개봉일이 " "로 되어있는 경우 존재.
                        "9999-01-01",
                        boxoffice_json[i]["openDt"],
                    ),
                    "_rank": boxoffice_json[i]["rank"],
                    "rank_inten": boxoffice_json[i]["rankInten"],
                    "rank_oldnew": boxoffice_json[i]["rankOldAndNew"],
                    "sales_amt": boxoffice_json[i]["salesAmt"],
                    "sales_inten": boxoffice_json[i]["salesInten"],
                    "sales_change": boxoffice_json[i]["salesChange"],
                    "sales_acc": boxoffice_json[i]["salesAcc"],
                    "audi_cnt": boxoffice_json[i]["audiCnt"],
                    "audi_inten": boxoffice_json[i]["audiInten"],
                    "audi_change": boxoffice_json[i]["audiChange"],
                    "audi_acc": boxoffice_json[i]["audiAcc"],
                    "scrn_cnt": boxoffice_json[i]["scrnCnt"],
                    "show_cnt": boxoffice_json[i]["showCnt"],
                }
            )

            table = "daily_boxoffice"
            data = boxoffice_dict

            placeholders = ", ".join(["%s"] * len(data))
            columns = ", ".join(data.keys())
            key_placeholders = ", ".join(["{0}=%s".format(k) for k in data.keys()])

            query = (
                "INSERT INTO %s ( %s ) VALUES ( %s ) ON DUPLICATE KEY UPDATE %s ;"
                % (
                    table,
                    columns,
                    placeholders,
                    key_placeholders,
                )
            )

            cursor.execute(query, list(data.values()) * 2)

        if self.needCommit:
            conn.commit()

일별 박스오피스 API 서비스

Kofic 영화진흥위원회의 API 서비스를 이용해 데이터를 수집합니다. 제공하는 서비스로는 ‘일별 박스오피스’, ‘주간/주말 박스오피스’, ‘영화 상세정보’ 등 영화와 관련된 정보들을 제공합니다. 그 중 ‘일별 박스오피스’ 정보를 json으로 받아서 이용하였으며, 조회하고자 하는 날짜는 ‘yyyymmdd’형식으로 입력해야 합니다.

1
2
3
4
5
6
7
8
9
10
11
12
# Target 날짜 설정
date = (context.get("execution_date") - timedelta(days=1)).strftime("%Y%m%d")

# API 호출
rdict = {"key": "Your API Key", "targetDt": date}

url = "http://www.kobis.or.kr/kobisopenapi/webservice/rest/boxoffice/searchDailyBoxOfficeList.json"
r = requests.get(
    url,
    params=rdict,
)
tmp = r.json()

MySQL DB 연결

pymysql 라이브러리를 이용해 미리 만들어 놓은 테이블에 연결합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
try:
    conn = pymysql.connect(
        user=self.user,
        passwd=self.password,
        host=self.host,
        db=self.database,
        charset="utf8",
    )
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    print("Successfully connected.")
except:
    logging.error("Could not connected")
    sys.exit(1)

Data 수집

response에는 해당 일자의 ‘박스오피스 순위’, ‘매출액’, ‘관객수’, ‘상영관’과 관련된 정보들이 있습니다. 개봉일 중 “ “로 되어있는 경우가 존재했는데, 이러한 경우 “9999-01-01”로 치환합니다. 해당 일자의 박스오피스 순위 및 필요한 정보들을 insert 합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
boxoffice_json = tmp["boxOfficeResult"]["dailyBoxOfficeList"]

boxoffice_dict = {}
for i in range(len(boxoffice_json)):

    boxoffice_dict.update(
        {
            "id": "a" + boxoffice_json[i]["movieCd"] + date[-2:],
            "_name": boxoffice_json[i]["movieNm"].replace("\t", ""),
            "target_date": datetime.strptime(date, "%Y%m%d").strftime(
                "%Y-%m-%d"
            ),
            "open_date": np.where(
                boxoffice_json[i]["openDt"] == " ",  # 개봉일이 " "로 되어있는 경우 존재.
                "9999-01-01",
                boxoffice_json[i]["openDt"],
            ),
            "_rank": boxoffice_json[i]["rank"],
            "rank_inten": boxoffice_json[i]["rankInten"],
            "rank_oldnew": boxoffice_json[i]["rankOldAndNew"],
            "sales_amt": boxoffice_json[i]["salesAmt"],
            "sales_inten": boxoffice_json[i]["salesInten"],
            "sales_change": boxoffice_json[i]["salesChange"],
            "sales_acc": boxoffice_json[i]["salesAcc"],
            "audi_cnt": boxoffice_json[i]["audiCnt"],
            "audi_inten": boxoffice_json[i]["audiInten"],
            "audi_change": boxoffice_json[i]["audiChange"],
            "audi_acc": boxoffice_json[i]["audiAcc"],
            "scrn_cnt": boxoffice_json[i]["scrnCnt"],
            "show_cnt": boxoffice_json[i]["showCnt"],
        }
    )

    table = "daily_boxoffice"
    data = boxoffice_dict

    placeholders = ", ".join(["%s"] * len(data))
    columns = ", ".join(data.keys())
    key_placeholders = ", ".join(["{0}=%s".format(k) for k in data.keys()])

    query = (
        "INSERT INTO %s ( %s ) VALUES ( %s ) ON DUPLICATE KEY UPDATE %s ;"
        % (
            table,
            columns,
            placeholders,
            key_placeholders,
        )
    )

    cursor.execute(query, list(data.values()) * 2)

if self.needCommit:
    conn.commit()

slack_operator

slack sdk

‘Slack’과 연동하기 위해서 slack_sdk 라이브러리를 이용합니다. 해당 라이브러리를 통해 ‘slack api’와 연결할 수 있습니다. API 이용에 필요한 token 값은 slack api 사이트에서 어플리케이션을 만든 후 확인할 수 있습니다.

slack_bot_operator

slack_sdk를 이용해 봇이 메세지를 보내는 operator를 만들 수 있습니다. 메시지를 받길 원하는 채널에 봇을 초대하여, 아래와 같은 operator를 이용해 원하는 메시지를 전송할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# plugins/operators/slack_bot_operator.py
from airflow.models import BaseOperator
from slack_sdk import WebClient


class SlackBotOperator(BaseOperator):
    def __init__(self, message, *args, **kwargs):
        self.message = message
        self.token = "your token!"
        self.channel = "your channel"
        super().__init__(*args, **kwargs)

    def execute(self, context):
        client = WebClient(token=self.token)
        response = client.chat_postMessage(channel=self.channel, text=self.message)

그리고 다음과 같은 두 함수를 이용하면 Task의 성공/실패에 따라 어떤 메시지를 전송할 것인지를 설정할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# plugins/utils/slack.py
from slack_sdk import WebClient
from operators.slack_bot_operator import SlackBotOperator


def slack_success_alert(context):
    alert = SlackBotOperator(
        task_id="slack_success",
        message="""
        :large_green_circle: Task success.
        *Task*: {task}  
        *Dag*: {dag}
        *Execution Time*: {exec_date}  
        *Log Url*: {log_url}
        """.format(
            task=context.get("task_instance").task_id,
            dag=context.get("task_instance").dag_id,
            exec_date=context.get("execution_date"),
            log_url=context.get("task_instance").log_url,
        ),
    )
    return alert.execute(context=context)


def slack_fail_alert(context):
    alert = SlackBotOperator(
        task_id="slack_failed",
        message="""
            :red_circle: Task Failed.
            *Task*: {task}  
            *Dag*: {dag}
            *Execution Time*: {exec_date}  
            *Log Url*: {log_url}
            """.format(
            task=context.get("task_instance").task_id,
            dag=context.get("task_instance").dag_id,
            exec_date=context.get("execution_date"),
            log_url=context.get("task_instance").log_url,
        ),
    )
    return alert.execute(context=context)

DAG

on_success_callbackon_failure_callback에 위에서 만든 함수를 각각 지정해주어 task의 성공 여부에 따라 어떤 알림을 받을 것인지 설정할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# dags/kobis_dag.py
import pendulum
from datetime import datetime, timedelta

from airflow import DAG
from operators.kobis_operator import KobisOperator

from utils.slack import slack_success_alert
from utils.slack import slack_fail_alert


with DAG(
    dag_id="example_slack_operator",
    schedule_interval="0 20 * * *",
    start_date=pendulum.datetime(2022, 3, 1, tz="Asia/Seoul"),
    catchup=False,
    dagrun_timeout=timedelta(minutes=60),
    tags=["example"],
    default_args={
        "on_success_callback": slack_success_alert,
        "on_failure_callback": slack_fail_alert,
    },
    params={},
) as dag:
    t1 = KobisOperator(
        task_id="insert_data",
        host="your host",
        user="your username",
        password="your password!",
        database="your db",
        needCommit=True,
        dag=dag,
    )

    t1

if __name__ == "__main__":
    dag.cli()
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.