环境 Centos 7.4 Python 2.7 Pip 2.7 MySQL-python 1.2.5 Elasticsearc 6.3.1 Elasitcsearch6.3.2 知识点 调用Python Elasticsearh API Python Mysqldb使用 DSL查询与聚合 Python 列表操作 代码 #!/usr/bin/env python# -*- coding: utf-
环境
- Centos 7.4
- Python 2.7
- Pip 2.7 MySQL-python 1.2.5 Elasticsearc 6.3.1
- Elasitcsearch6.3.2
知识点
- 调用Python Elasticsearh API
- Python Mysqldb使用
- DSL查询与聚合
- Python 列表操作
代码
#!/usr/bin/env python # -*- coding: utf-8 -*- #minyt 2018.9.1 #获取24小时内出现的模块次数 # 该程序通过elasticsearch python client 获取相关精简数据,可以计算请求数、超时数、错误数、正确率、错误率等等 import MySQLdb from elasticsearch import Elasticsearch from elasticsearch import helpers #定义elasticsearch集群索引名 index_name = "logstash-nginxlog-*" #实例化Elasticsearch类,并设置超时间为180秒,默认是10秒的,如果数据量很大,时间设置更长一些 es = Elasticsearch(['elasticsearch01','elasticsearch02','elasticsearch03'],timeout=180) #DSL(领域特定语言)查询语法,查询top50 sname的排列次数 data_sname = { "aggs": { "2": { "terms": { "field": "apistatus.sname.keyword", "size": 100, "order": { "_count": "desc" } } } }, "size": 0, "_source": { "excludes": [] }, "stored_fields": [ "*" ], "script_fields": {}, "docvalue_fields": [ "@timestamp" ], "query": { "bool": { "must": [ { "match_all": {} }, { "range": { "@timestamp": { "gte" : "now-24h/h", "lt" : "now/h" } } } ], "filter": [], "should": [], "must_not": [] } } } #按照DSL(特定领域语言)语法查询获取数据 def get_original_data(): try: #根据上面条件搜索数据 res = es.search( index=index_name, size=0, body=data_sname ) return res except: print "get original data failure" #初始化数据库 def init_mysql(): # 打开数据库连接 db = MySQLdb.connect("localhost", "myuser", "mypassword", "mydb", charset='utf8' ) # 使用cursor()方法获取操作游标 cursor = db.cursor() # SQL 更新语句 sql = "update appname set count=0" try: # 执行SQL语句 cursor.execute(sql) # 提交到数据库执行 db.commit() except: # 发生错误时回滚 db.rollback() # 关闭数据库连接 db.close() def updata_mysql(sname_count,sname_list): # 打开数据库连接 db = MySQLdb.connect("localhost", "myuser", "mypassword", "mydb", charset='utf8' ) # 使用cursor()方法获取操作游标 cursor = db.cursor() # SQL 更新语句 sql = "update appname set count=%d where sname = '%s'" % (sname_count,sname_list) try: # 执行SQL语句 cursor.execute(sql) # 提交到数据库执行 db.commit() except: # 发生错误时回滚 db.rollback() # 关闭数据库连接 db.close() #根据Index数据结构通过Elasticsearch Python Client上传数据到新的Index def import_process_data(): try: #列表形式显示结果 res = get_original_data() #print res res_list = res.get('aggregations').get('2').get('buckets') #print res_list #初始化数据库 init_mysql() #获取24小时内出现的SNAME for value in res_list: sname_list = value.get('key') sname_count = value.get('doc_count') print sname_list,sname_count #更新sname_status值 updata_mysql(sname_count,sname_list) except Exception, e: print repr(e) if __name__ == "__main__": import_process_data()总结
关键是DSL语法的编写涉及查询与聚合可以通过kibana的visualize或者devtool先测试出正确语法,然后结合python对列表、字典、除法、字符串等操作即可。下面汇总下各个算法:
-
总请求http_host.keyword: api.mydomain.com
-
超长请求http_host.keyword: api.mydomain.com AND request_time: [1 TO 600] NOT apistatus.status.keyword:*错误
-
错误请求apistatus.status.keyword:*错误 AND (http_host.keyword: api.mydomain.com OR http_host.keyword: api.yourdomain.com )
-
请求健康度域名与request_time聚合,域名请求时间小于3秒的次数除以总请求次数对应各个域名健康度
- 请求正确率域名与http状态码聚合,域名http状态码为200的次数除以域名总请求数对应各个域名的请求正确率