当前位置 : 主页 > 编程语言 > python >

Dolphinscheduler中Hive慢查询的耗时最时间最长告警

来源:互联网 收集:自由互联 发布时间:2022-06-15
1.Dolphinscheduler钉钉告警 import pymysql import json import requests import hmac import hashlib import time import base64 import urllib from DBUtils . PooledDB import PooledDB # 数据库连接池 需要执行安装 pip3 install DBUtil

1.Dolphinscheduler钉钉告警

import pymysql
import json
import requests
import hmac
import hashlib
import time
import base64
import urllib
from DBUtils.PooledDB import PooledDB

# 数据库连接池 需要执行安装 pip3 install DBUtils==1.3
from apscheduler.schedulers.blocking import BlockingScheduler

from dolphinscheduler import analysis_processing

POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3,
# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='*************',
database='dolphinscheduler',
charset='utf8')


def get_data_fetchall(sql):
connect = POOL.connection()
cursor = connect.cursor()
try:
cursor.execute(sql)
results = cursor.fetchall()
return results
except:
print("Error: unable to fetch data")
connect.close()


def main():
# 获取任务列表
exec_sql = '''
SELECT
c.id,
c.NAME AS project_name,
b.NAME AS process_name,
a.NAME AS task_name,
a.submit_time,
a.start_time,
a.end_time,
round((unix_timestamp(a.end_time)-unix_timestamp(a.start_time))/3600,2) AS duration
FROM t_ds_task_instance a
LEFT JOIN t_ds_process_definition b ON a.process_definition_id=b.id
LEFT JOIN t_ds_project c ON b.project_id=c.id
WHERE a.submit_time>=date_format(now(),"%Y-%m-%d 00:00:00") AND a.task_type="SHELL"
ORDER BY duration DESC LIMIT 10
'''
result_all = get_data_fetchall(exec_sql)
# 打印受影响的行数
sendtext = "任务慢查询 [ %d ]条数据:" % result_all.__len__()
print(sendtext)

# 任务告警相关
if result_all is not None:
for result in result_all:
id = result[0] # id
project_name = result[1] # 项目名称
process_name = result[2] # 执行进程名称
task_name = result[3] # Task名称
submit_time = result[4].strftime('%Y-%m-%d %H:%M:%S') # 提交时间
start_time = result[5].strftime('%Y-%m-%d %H:%M:%S') # 执行开始时间
end_time = result[6].strftime('%Y-%m-%d %H:%M:%S') # 执行结束时间
duration = result[7] # 任务耗时

alert_msg_template = " `【%s - 任务检测到慢查询!!!】` /n>**项目名称:** *%s* /n>**执行进程:** *%s* /n>**Task名称:** *%s* /n>**任务提交时间:** *%s* /n>**执行开始时间:** *%s* /n>**执行结束时间:** *%s* /n>**任务耗时:** *%s* /n" % (
task_name, project_name, process_name, task_name, submit_time, start_time, end_time, duration)
title = '【%s - 任务检测到慢查询!!!】' % task_name
send_dingding(title, alert_msg_template)


def send_dingding(title, text):
timestamp, sign = get_timestamp_sign()
url = "https://oapi.dingtalk.com/robot/send?access_token=你的token" + \
"&timestamp=" + timestamp + "&sign=" + sign
h = {"Content-type": "application/json;charset=utf-8"}
values = {
"msgtype": "markdown",
"markdown": {
"title": "%s" % title,
"text": "%s" % text
},
"at": {
"isAtAll": True
}
}
x_msg = json.dumps(values).replace('/n', '\n\n')
res = requests.post(url, data=x_msg, headers=h)
errmsg = json.loads(res.text)['errmsg']
if errmsg == 'ok':
return 'ok'

return 'fail: %s' % res.text


def get_timestamp_sign():
timestamp = str(round(time.time() * 1000))
secret = "72b70fb27f1d0d5e218d676ed9c250126e4d1df3fe98e422a88d6dece337f6d4" # SEC开头的
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc,
digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return (timestamp, sign)


if __name__ == '__main__':
main()

效果展示

Dolphinscheduler中Hive慢查询的耗时最时间最长告警_mysql

2.Grafana查询展示 

select
c.id as "序号",
c.name as "项目名称" ,
b.name as "任务名称" ,
a.name as "Task名称" ,
a.submit_time as "任务提交时间",
a.start_time as "执行开始时间",
a.end_time as "执行结束时间",
round((unix_timestamp(a.end_time)-unix_timestamp(a.start_time))/3600,2) as "耗时"
from dolphinschedulernew.t_ds_task_instance a
left join t_ds_process_definition b on a.process_definition_id = b.id
left join t_ds_project c on b.project_id = c.id
where a.submit_time >= date_format(now(),'%Y-%m-%d 00:00:00')
and a.task_type = 'SHELL'
order by 8 desc
limit 50

Dolphinscheduler中Hive慢查询的耗时最时间最长告警_mysql_02

网友评论