send.py 1 # -*- coding:utf8 -*- 2 import requests 3 import json 4 from reqwx import get_access_token, send_module_msg, get_module_id 5 a_Token = get_access_token() 6 7 url = " https://api.weixin.qq.com/cgi-bin/message/custom/send " 8 9 req_
send.py
1 #-*- coding:utf8 -*- 2 import requests 3 import json 4 from reqwx import get_access_token, send_module_msg, get_module_id 5 a_Token = get_access_token() 6 7 url = "https://api.weixin.qq.com/cgi-bin/message/custom/send" 8 9 req_get_uid_url = "https://api.weixin.qq.com/cgi-bin/user/get" 10 para_get_uid_data = {"access_token": a_Token} 11 res_get_uid = req_get_uid = requests.get(req_get_uid_url, para_get_uid_data) 12 res_b = req_get_uid.content 13 res = json.loads(res_b) 14 print(res) 15 openid = res["data"]["openid"][0] 16 17 ss = "你好" 18 txt_body = {"touser":openid, 19 "msgtype":"text", 20 "text":{ 21 "content":ss 22 23 }} 24 25 sys_name = "测试系统说明" 26 sys_info = "测试信息内容" 27 ss_body = {"sysname":{ 28 "value":sys_name, 29 "color":"#FF3117" 30 }, 31 "sysinfo":{ 32 "value": sys_info, 33 "color": "#173177" 34 }} 35 36 module_id = get_module_id(a_Token)
37 send_module_msg(a_Token,openid,module_id,ss_body)
reqwx.py
1 import requests 2 import json 3 from config import APPID, SECRET #get config from config.py 4 5 6 def to_json(str): 7 return json.loads(str) 8 9 10 def get_access_token(): 11 """ 12 获取全局接口凭证,默认有效2小时, 13 """ 14 result = requests.get( 15 url="https://api.weixin.qq.com/cgi-bin/token", 16 params={ 17 "grant_type": "client_credential", 18 "appid": APPID, 19 "secret": SECRET 20 } 21 ).json() 22 if result.get("access_token"): 23 access_token = result.get("access_token") 24 25 else: 26 access_token = None 27 f = open("config.ini","w") 28 f.write("token"+ access_token) 29 f.close() 30 return access_token 31 32 33 def sendmsg(openid,a_Token,msg): 34 """ 35 发送消息给用户 36 :param openid: 用户ID 37 :param msg: 消息内容 38 :return: 39 """ 40 send_url = "https://api.weixin.qq.com/cgi-bin/message/custom/send" 41 body = { 42 "touser": openid, 43 "msgtype": "text", 44 "text": { 45 "context": msg 46 } 47 } 48 s_req = requests.post(send_url,params={"access_token":a_Token} , data=bytes((body), encoding="UTF-8")) 49 50 51 def send_module_msg(a_Token, openid, t_id, msg): 52 """ 53 发送模板消息 54 :param a_Token: 55 :param openid: 56 :param t_id: 57 :param msg: 58 :return: 返回状态 json 59 """ 60 send_url = "https://api.weixin.qq.com/cgi-bin/message/template/send" 61 para = {"access_token":a_Token} 62 body = { 63 "touser": openid, 64 "topcolor": "#FE0000", 65 "template_id":t_id, 66 "data":msg 67 } 68 r = requests.post(send_url, params=para, json=body) 69 return r.content 70 71 72 def get_module_id(a_Token): 73 para_m_url = "https://api.weixin.qq.com/cgi-bin/template/get_all_private_template" 74 req_m_id = requests.post(para_m_url, params={"access_token": a_Token}) 75 module_list = to_json(req_m_id.content) 76 module_id = module_list["template_list"][0]["template_id"] 77 return module_id
后期优化:
token 持久化
获取人员和消息模板,发送消息。
集成到其他内容,比如监控发消息,自动化测试发送结果等