1. 背景
团队维护多套业务系统,有支付系统、金融系统、数据系统、核验系统等二十多套业务系统,每套业务系统有10 - 50台服务器不等,当前团队中存在一套自动化巡检系统,每十分钟巡检一次,有异常可实时告警。但每天还需要人工登录服务器巡检两次,因人工巡检耗时长、漏巡风险大,需要一个运维巡检工具帮助实现快速人工巡检。
2. 巡检逻辑
3. 功能实现
通过巡检机的python脚本登录各系统服务器并执行存在的shell巡检脚本,获取shell脚本产生的巡检结果并处理,最终生成Excel文件。3.1 shell部分
#!/bin/bash # 全系统统一巡检脚本 # ↓ ↓ ↓ 写在脚本开头 ↓ ↓ ↓ # # global variables log_dir=/tmp/unity/unitylog # 日志路径 exe_dir=/tmp/unity # 执行路径 clean_size=200 # 日志清理大小 # create log directory if [ ! -d $log_dir ];then mkdir -p $log_dir else # clean log # clean up when log file is greater than $clean_size. log_size=`du -sm $log_dir/unity_patrol.log | awk '{print $1}'` if [ $log_size -ge $clean_size ];then echo > $log_dir/unity_patrol.log fi fi echo "[$(date "+%Y-%m-%d %H:%M:%S")] [INFO] [SYSTEM] Start System Patrol." >> $log_dir/unity_patrol.log # create a title cat $exe_dir/unity_patrol.sh | grep "item_name=" | grep -v "grep" | awk -F"=" '{print $2}' | awk '{print $1}' | tr '\n' ',' | awk '{print $0}' > $log_dir/unity_patrol.txt # ↑ ↑ ↑ 写在脚本开头 ↑ ↑ ↑ # # HA状态 item_name=HA_STATUS # 巡检项 alarm_value="ST_STABLE" # 告警值 echo "[$(date "+%Y-%m-%d %H:%M:%S")] [INFO] [$item_name] START" >> $log_dir/unity_patrol.log echo "[$(date "+%Y-%m-%d %H:%M:%S")] [INFO] [$item_name] get hacmp status." >> $log_dir/unity_patrol.log lssrc -ls clstrmgrES >> $log_dir/unity_patrol.log hacmp_status=`lssrc -ls clstrmgrES | grep "Current state" | awk -F":" '{print $2}' | awk '{print $1}'` if [ "$hacmp_status" == "$alarm_value" ];then echo "[$(date "+%Y-%m-%d %H:%M:%S")] [INFO] [$item_name] hacmp status is $subsystem_status." >> $log_dir/unity_patrol.log echo "hacmp status:$hacmp_status" >> $log_dir/unity_patrol.tmp echo "[$item_name] hacmp status: $hacmp_status ==> OK" else echo "[$(date "+%Y-%m-%d %H:%M:%S")] [ERROR] [$item_name] hacmp status is $subsystem_status." >> $log_dir/unity_patrol.log echo "hacmp status:$hacmp_status*" >> $log_dir/unity_patrol.tmp echo "[$item_name] hacmp status: $hacmp_status ==> Failed" echo "[$item_name] lssrc -ls clstrmgrES" fi echo "[$(date "+%Y-%m-%d %H:%M:%S")] [INFO] [$item_name] END" >> $log_dir/unity_patrol.log # 磁盘监控 ... # 内存监控 ... # cpu监控 ... # xxx服务监控 ... # ↓ ↓ ↓ 写在脚本结尾 ↓ ↓ ↓ # # global variables log_dir=/tmp/unity/unitylog # 日志路径 # process all inspection data cat $log_dir/unity_patrol.tmp | tr '\n' ',' | awk '{print $0}' >> $log_dir/unity_patrol.txt rm -rf $log_dir/*.tmp echo "[$(date "+%Y-%m-%d %H:%M:%S")] [INFO] [SYSTEM] the system patrol has been completed." >> $log_dir/unity_patrol.log # ↑ ↑ ↑ 写在脚本结尾 ↑ ↑ ↑ #写在脚本开头
3.2 Python部分
3.2.1 定义全局变量
import os import re import sys import time import email from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.application import MIMEApplication import smtplib import imaplib import pymysql import binascii from pyDes import des, CBC, PAD_PKCS5 import paramiko import pandas as pd import openpyxl from openpyxl.styles.borders import Border, Side from openpyxl.utils import get_column_letter from openpyxl.styles import PatternFill class Encrypt: # 全局变量 def __init__(self): # 配置加密key self.key = '12345678' self.iv = self.key # 配置数据库连接和服务器登录信息 self.conn = pymysql.connect(host='192.168.x.x', user='admin', password='xxxxxx', database='xxDB', port=3306) self.cursor = self.conn.cursor() self.cursor_getservers = self.conn.cursor() self.ssh = paramiko.SSHClient() self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 获取业务系统IP self.systems = self.get_servers() self.title_list = ['IP', 'SERVER_TYPE'] # 设置Excel文件导出位置 self.export_excel = "C:\\Users\\user\\Desktop\\export\\export_excel.xlsx" self.xlsx_file_name = pd.ExcelWriter(self.export_excel) # 密码加密 def des_encrypt(self, txt): k = des(self.key, CBC, self.iv, pad=None, padmode=PAD_PKCS5) en = k.encrypt(txt, padmode=PAD_PKCS5) return binascii.b2a_hex(en) # 密码解密 def des_decrypt(self, txt): k = des(self.key, CBC, self.iv, pad=None, padmode=PAD_PKCS5) de = k.decrypt(binascii.a2b_hex(txt), padmode=PAD_PKCS5) return de # 获取各系统登录数据 def get_serverinfo(self, system): select_systeminfo = 'select system, ip, server_type, user, passwd from tbl_systeminfo where system = \'{}\''.format(system) self.cursor.execute(select_systeminfo) return self.cursor.fetchall() # 获取所有系统名 def get_servers(self): select_systems = 'select system from tbl_systeminfo group by system' self.cursor_getservers.execute(select_systems) systems_list = [] for system in self.cursor_getservers.fetchall(): systems_list.append(system[0]) return systems_list3.2.2 远程执行shell脚本
# 登录并执行shell脚本 def login_exec_sh(self, host, username, password): try: self.ssh.connect(hostname=host, username=username, password=password, timeout=5) stdin, stdout, stderr = self.ssh.exec_command('[ -f /tmp/monitx/unity_patrol.sh ] && sh /tmp/monitx/unity_patrol.sh || echo "SSH_ERROR"') stdout_content = stdout.read().decode('utf-8') if 'SSH_ERROR' in stdout_content: exp_info = 'SSH_ERROR' else: stdin1, stdout1, stderr1 = self.ssh.exec_command('cat /tmp/monitx/unitylog/unity_patrol.txt') exp_info = stdout1.read().decode('utf-8') except Exception as e: print(e) exp_info = 'SSH_ERROR' return exp_info3.2.3 处理脚本执行结果
# 数据处理 def run(self): # 判断是否为全系统巡检 # 若需要巡检部分系统输入巡检的系统名称,否则为全系统巡检 print("""请输入要巡检的系统名称,以空格为分隔符 不输入为全系统巡检 """) xj_system = input("输入要巡检的系统: ").splitlines() xj_system = "".join(xj_system).split(' ') print(""" --> 开始全系统巡检 <-- """) for input_system in xj_system: if input_system in self.systems: xj_system = xj_system else: print(input_system, "系统不存在, 请重新填写!") print(""" --> 巡 检 结 束 <--""") exit() summary_title = ['SYSTEM', 'ERROR_NUM'] summary_list = [] # 根据业务系统获取ip地址、用户名、密码等信息 for system in xj_system: system_all = self.get_serverinfo(system) header_title = [] system_patrol_list = [] sum_error = 0 for system_info in system_all: host = system_info[1] server_type = system_info[2] username = system_info[3] passwd = self.des_decrypt(system_info[4]) xj_info = self.login_exec_sh(host, username, passwd).splitlines() print("正在处理 {}: {}".format(system, host)) # 判断ssh是否成功 if 'SSH_ERROR' in xj_info: host = system_info[1] + '*' # IP地址前两段脱敏 host_des = ".".join([str(part if i >= 2 else "#") for i, part in enumerate(host.split("."))]) # 将固定标题内容与巡检项结果合并 ip_type = [host_des, server_type] title_ip_dict = dict(zip(self.title_list, ip_type)) xj_items = dict(title_ip_dict) system_patrol_list.append(xj_items) # system_errors = [system, sum_error] # summary_sheet = dict(zip(summary_title, system_errors)) else: header_title.append(xj_info[0]) # 删除最后一个空元素 xj_header_title = "".join(xj_info[0]).split(',')[0:-1] xj_content = "".join(xj_info[1]).replace(' ','\n').strip().split(',')[0:-1] # 将巡检项和巡检结果对应 hd_ct_dict = dict(zip(xj_header_title, xj_content)) # IP地址前两段脱敏 host_des = ".".join([str(part if i >= 2 else "#") for i, part in enumerate(host.split("."))]) # 将固定标题内容与巡检项结果合并 ip_type = [host_des, server_type] title_ip_dict = dict(zip(self.title_list, ip_type)) xj_items = dict(title_ip_dict, **hd_ct_dict) system_patrol_list.append(xj_items) # 检查业务系统错误巡检项数量 for dic_item in system_patrol_list: for k, v in dic_item.items(): if "*" in v: sum_error += 1 sum_error_star = str(sum_error) + '*' system_errors = [system, sum_error_star] summary_sheet = dict(zip(summary_title, system_errors)) summary_list.append(summary_sheet) summary_sheet_df = pd.DataFrame(summary_list) # 将巡检内容转换为DataFrame格式并导出至excel xj_data_df = pd.DataFrame(system_patrol_list) xj_data_df = xj_data_df.fillna('-') xj_data_df.to_excel(self.xlsx_file_name, sheet_name=system, index=False) summary_sheet_df.to_excel(self.xlsx_file_name, sheet_name='总览', index=False) # 关闭mysql连接和ssh连接 self.conn.close() self.ssh.close() self.xlsx_file_name.close()3.2.4 调整Excel表格样式
3.2.5 发送邮件
# 全局变量 def __init__(self): # 设置邮箱参数 self.smtp_server = 'smtp.163.com' self.imap_server = 'imap.163.com' self.smtp_port = 25 self.sender_email = 'xxx@163.com' self.sender_passwd ='xxxxxxxx' self.receiver_email = ['xxx@163.com', 'xxx@qq.com', 'xxx@sina.com'] # 发送邮件 def send_email(self): # 连接到IMAP服务器 try: mail = imaplib.IMAP4_SSL(self.imap_server) mail.login(self.sender_email, self.sender_passwd) except: time.sleep(1) # 判断文件是否存在 try: if os.path.exists(self.export_excel): with open(self.export_excel, 'rb') as f: excel_data = f.read() msg_root = MIMEMultipart() msg_root['From'] = self.sender_email msg_root['To'] = ','.join(self.receiver_email) msg_root['Subject'] = '系统巡检结果' msg_text = MIMEText('全系统统一巡检结果, 详情见附件。') msg_root.attach(msg_text) # 添加附件 excel_send = MIMEApplication(excel_data) excel_send["Content-Type"] = 'application/octet-stream' excel_send.add_header('Content-Disposition', 'attachment', filename='统一巡检结果.xlsx') msg_root.attach(excel_send) # 登录邮箱 smtp = smtplib.SMTP(self.smtp_server, self.smtp_port) smtp.starttls() smtp.login(self.sender_email, self.sender_passwd) # 发送邮件 smtp.sendmail(self.sender_email, self.receiver_email, msg_root.as_string()) smtp.quit() except Exception as e: print("No file:", e)3.2.6 通过邮件远程巡检
