前言
运维故障排障速度往往与监控系统体系颗粒度成正比,监控到位才能快速排障
在部署这套系统之前,平台所有系统日志都由Graylog+Zabbix,针对日志出现的错误关键字进行告警,这种做法在运维工作开展过程中暴露出多个不足点,不详述;在考虑多方面原因后,最终对日志告警系统进行更换,选用的方案是:ELK + Kafka+ Filebeat + Elastalert
本文主要以两个需求为主轴做介绍
- 非工作时间服务器异常登录告警
- 系统日志出现错误关键字告警
架构
服务选型
name version info Amazon Elasticsearch Service v6.2 AWK官网部署教程 Logstash v6.2.3 选用与ES相同版本 Filebeat v6.2.3 选用与ES相同版本 Confluent(Kafka) v4.0 这里推荐 Confluent 的版本,Confluent 是 kafka 作者 Neha Narkhede 从 Linkedin 出来之后联合 LinkedIn 前员工创建的大数据公司,专注于 kafka 的企业应用。 Elastalert v0.1.29 原先考虑采用X-Pack但由于AWS目前还不支持部署
本文采用的操作系统 :CentOS release 6.6
Filebeat
# 下载源 $ curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.2.3-x86_64.rpm # 安装 $ sudo rpm -vi filebeat-6.2.3-x86_64.rpmLogstash
# 导入Yum源 $ rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch $ cat <<EOF > /etc/yum.repos.d/logstash.repo [logstash-6.x] name=Elastic repository for 6.x packages baseurl=https://artifacts.elastic.co/packages/6.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md EOF # 安装 yum install logstash -yElastalert
# pip直接安装 $ pip install elastalert # 如果出现依赖包报错,以下为常用开发所需依赖包 $ yum install -y zlib openssl openssl-devel gcc gcc-c++ Xvfb libXfont Xorg libffi libffi-devel python-cffi python-devel libxslt-devel libxml2-devel zlib-devel bzip2-devel xz-libs wget配置
Filebeat/etc/filebeat/filebeat.yml
filebeat.config: prospectors: path: /etc/filebeat/conf/*.yml reload.enabled: true reload.period: 10s output.kafka: # kafkaNode为Kafaka服务所在服务器 hosts: ["kafkaNode:9092"] # 索引取fields.out_topic topic: "%{[fields][out_topic]}" partition.round_robin: reachable_only: false/etc/filebeat/conf/base.yml
# 收集系统日志 - type: log paths: - /var/log/messages - /var/log/syslog* exclude_files: [".gz$"] exclude_lines: ["ssh_host_dsa_key"] tags: ["system_log"] scan_frequency: 1s fields: # 新增字段用于辨别来源客户端 server_name: client01 # 索引 out_topic: "system_log" multiline: pattern: "^\\s" match: after # 收集登录日志 - type: log paths: - /var/log/secure* - /var/log/auth.log* tags: ["system_secure"] exclude_files: [".gz$"] scan_frequency: 1s fields: server_name: client01 out_topic: "system_secure" multiline: pattern: "^\\s" match: afterLogstash
/etc/logstash/conf.d/system_log.conf
input { kafka { bootstrap_servers => "kafkaNode:9092" consumer_threads => 3 topics => ["system_log"] auto_offset_reset => "latest" codec => "json" } } filter { # 排除logstash日志 if [source] == "/var/log/logstash-stdout.log" { drop {} } if [fields][out_topic] == "system_log" { date {match => [ "[system][syslog][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]} grok { match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] } pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" } remove_field => "message" } } } output { elasticsearch { hosts => ["<亚马逊ES地址>"] index => "%{[fields][out_topic]}_%{+YYYYMMdd}" document_type => "%{[@metadata][type]}" } }/etc/logstash/conf.d/secure_log.conf
input { kafka { bootstrap_servers => "kafkaNode:9092" consumer_threads => 3 topics => ["system_secure"] auto_offset_reset => "latest" codec => "json" } } filter { if [fields][out_topic] == "system_secure" { grok { match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][user][add][name]}, UID=%{NUMBER:[system][auth][user][add][uid]}, GID=%{NUMBER:[system][auth][user][add][gid]}, home=%{DATA:[system][auth][user][add][home]}, shell=%{DATA:[system][auth][user][add][shell]}$", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] } pattern_definitions => {"GREEDYMULTILINE"=> "(.|\n)*"} remove_field => "message" } } } output { elasticsearch { hosts => ["<亚马逊ES地址>"] index => "%{[fields][out_topic]}_%{+YYYYMMdd}" document_type => "%{[@metadata][type]}" } }Kafka
# 导入 rpm --import https://packages.confluent.io/rpm/4.0/archive.key cat <<EOF > /etc/yum.repos.d/confluent.repo [Confluent.dist] name=Confluent repository (dist) baseurl=https://packages.confluent.io/rpm/4.0/6 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/4.0/archive.key enabled=1 [Confluent] name=Confluent repository baseurl=https://packages.confluent.io/rpm/4.0 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/4.0/archive.key enabled=1 EOF yum install confluent-platform-oss-2.11Elastalert
Elastalert可以部署到任何一台能够读取到ES的服务器上;配置文件中modules.eagle_post.EagleAlerter blacklist_v2经过修改,后面会介绍到
rules/system_log.yaml
es_host: <亚马逊ES地址> es_port: 80 name: system log rule type: blacklist_v2 index: system_log* timeframe: minutes: 1 # 监控key compare_key: system.syslog.message # 出现下面任意关键字将告警,按需添加 blacklist_v2: - "ERROR" - "error" alert: "modules.eagle_post.EagleAlerter" eagle_post_url: "<eagle>" eagle_post_all_values: False eagle_post_payload: server: "fields.server_name" info: "system.syslog.message" source: "source"rules/system_log.yaml
es_host: <亚马逊ES地址> es_port: 80 name: system secure rule type: frequency index: system_secure* num_events: 1 timeframe: minutes: 1 filter: - query: wildcard: system.auth.user : "*" alert: "modules.eagle_post.EagleAlerter" eagle_post_url: "<eagle>" eagle_post_all_values: False # 非工作时间 eagle_time_start: "09:00" eagle_time_end: "18:00" eagle_post_payload: user: "system.auth.user" server: "fields.server_name" ip: "system.auth.ssh.ip" event: "system.auth.ssh.event"Elastalert
自定义type与alert
为了能够将告警接入到Eagle(自研统一接口平台)在尝试使用http_post做告警类型过程中,发现无法传入ES结果作为POST参数,所以对其进行简单修改,新增类型,实现能够无缝接入Eagle
Alert
moudules/eagle_post.py
将文件夹保存到site-packages/elastalert
import json import requests import dateutil.parser import datetime from elastalert.alerts import Alerter from elastalert.util import EAException from elastalert.util import elastalert_logger from elastalert.util import lookup_es_key class EagleAlerter(Alerter): def __init__(self, rule): super(EagleAlerter, self).__init__(rule) # 设定时间有效范围 self.post_time_start = self.rule.get('eagle_time_start','00:00') self.post_time_end = self.rule.get('eagle_time_end','00:00') # post链接 self.post_url = self.rule.get('eagle_post_url','') self.post_payload = self.rule.get('eagle_post_payload', {}) self.post_static_payload = self.rule.get('eagle_post_static_payload', {}) self.post_all_values = self.rule.get('eagle_post_all_values', False) self.post_lock = False def alert(self, matches): if not self.post_url: elastalert_logger.info('Please input eagle url!') return False for match in matches: # 获取所有payload payload = match if self.post_all_values else {} # 构建字典 for post_key, es_key in self.post_payload.items(): payload[post_key] = lookup_es_key(match, es_key) # 获取当前时间 login_time = datetime.datetime.now().time() # 获取时间限制 time_start = dateutil.parser.parse(self.post_time_start).time() time_end = dateutil.parser.parse(self.post_time_end).time() # 如果在时间范围内,将不做告警 self.post_lock = False if login_time > time_start and \ login_time < time_end else True # 合并两种类型payload data = self.post_static_payload data.update(payload) # 发送告警 if self.post_lock: myRequests = requests.Session() myRequests.post(url=self.post_url,data=data,verify=False) elastalert_logger.info("[-] eagle alert sent.") else: elastalert_logger.info("[*] nothing to do.") def get_info(self): return {'type': 'http_post'}type
在使用blaklist过程发现改类型是全匹配,为了方便编写配置文件,所以对其做了简单修改
elastalert/ruletypes.py
# 新增 class BlacklistV2Rule(CompareRule): required_options = frozenset(['compare_key', 'blacklist_v2']) def __init__(self, rules, args=None): super(BlacklistV2Rule, self).__init__(rules, args=None) self.expand_entries('blacklist_v2') def compare(self, event): term = lookup_es_key(event, self.rules['compare_key']) # 循环配置文件, 这种做法对性能有一定的损耗,在没找到更合适的解决方案前,就采取这种方式 for i in self.rules['blacklist_v2']: if i in term: return True return Falseelastalert/config.py
# 新增 rules_mapping = { 'frequency': ruletypes.FrequencyRule, 'any': ruletypes.AnyRule, 'spike': ruletypes.SpikeRule, 'blacklist': ruletypes.BlacklistRule, 'blacklist_v2': ruletypes.BlacklistV2Rule, 'whitelist': ruletypes.WhitelistRule, 'change': ruletypes.ChangeRule, 'flatline': ruletypes.FlatlineRule, 'new_term': ruletypes.NewTermsRule, 'cardinality': ruletypes.CardinalityRule, 'metric_aggregation': ruletypes.MetricAggregationRule, 'percentage_match': ruletypes.PercentageMatchRule, }elastalert/schema.yaml
# 新增 - title: BlacklistV2 required: [blacklist_v2, compare_key] properties: type: {enum: [blacklist_v2]} compare_key: {'items': {'type': 'string'},'type': ['string', 'array']} blacklist: {type: array, items: {type: string}}打进Docker
做了个简单DockerFile做参考
FROM python:2.7-alpine ENV SITE_PACKAGES /usr/local/lib/python2.7/site-packages/elastalert WORKDIR /opt/elastalert RUN apk update && apk add gcc ca-certificates openssl-dev openssl libffi-dev gcc musl-dev tzdata openntpd && \ pip install elastalert && cp -rf /usr/share/zoneinfo/Asia/Taipei /etc/localtime COPY ./ /opt/elastalert CMD ["/opt/elastalert/start.sh"]start.sh
#!/bin/sh SITE_PATH=/usr/local/lib/python2.7/site-packages/elastalert CONFIG=/opt/elastalert/config/config.yaml MODULES=/opt/elastalert/modules if [ -n "${MODULES}" ] then \cp -rf ${MODULES} ${SITE_PATH} echo "[-] Copy ${MODULES} to ${SITE_PATH}" fi \cp -rf elastalert/* ${SITE_PATH}/ echo "[-] Copy elastalert/* to ${SITE_PATH}" python -m elastalert.elastalert --verbose --config ${CONFIG}基础工作准备就绪,加入Bee容器管理平台完成自动构建。
实现效果
碰到的坑
Zookeeper
问题描述
老版Kafaka依赖Zookeeper,默认安装时注册地址为:localhost,导致问题的现象:
filebeat错误日志
2018-04-25T09:14:55.590+0800 INFO kafka/log.go:36 client/metadata fetching metadata for [[[system_log] kafkaNode:9092]] from broker %!s(MISSING) 2018-04-25T09:14:55.591+0800 INFO kafka/log.go:36 producer/broker/[[0]] starting up 2018-04-25T09:14:55.591+0800 INFO kafka/log.go:36 producer/broker/[[0 %!d(string=system_log) 0]] state change to [open] on %!s(MISSING)/%!d(MISSING) 2018-04-25T09:14:55.591+0800 INFO kafka/log.go:36 producer/leader/[[system_log %!s(int32=0) %!s(int32=0)]]/%!d(MISSING) selected broker %!d(MISSING) 2018-04-25T09:14:55.591+0800 INFO kafka/log.go:36 producer/leader/[[system_secure %!s(int32=0) %!s(int=3)]]/%!d(MISSING) state change to [retrying-%!d(MISSING)] 2018-04-25T09:14:55.591+0800 INFO kafka/log.go:36 producer/leader/[[system_secure %!s(int32=0) %!s(int32=0)]]/%!d(MISSING) abandoning broker %!d(MISSING) 2018-04-25T09:14:55.592+0800 INFO kafka/log.go:36 producer/broker/[[0]] shut down 2018-04-25T09:14:55.592+0800 INFO kafka/log.go:36 Failed to connect to broker [[localhost:9092 dial tcp [::1]:9092: getsockopt: connection refused]]: %!s(MISSING)日志出现两个地址,一个是kafka地址,另外出现一个localhost地址。这是因为filebeat已经跟kafaka建立了连接,但是从kafaka到zookeeper这一段找不到
解决方法
# get /brokers/ids/0 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://localhost:9092"],"jmx_port":-1,"host":"localhost","timestamp":"1523429158364","port":9092,"version":4} cZxid = 0x1d ctime = Wed Apr 11 14:45:58 CST 2018 mZxid = 0x1d mtime = Wed Apr 11 14:45:58 CST 2018 pZxid = 0x1d cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x162b374170d0000 dataLength = 188 numChildren = 0 # 发现注册地址是localhost,修改之 set /brokers/ids/0 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafkaNode:9092"],"jmx_port":9999,"host":"kafkaNode","timestamp":"1523429158364","port":9092,"version":4}修改完重启,问题解决。
博文链接:http://a-cat.cn/2018/04/29/aws-elk/