当前位置 : 主页 > 编程语言 > python >

django订阅gerrit事件流数据

来源:互联网 收集:自由互联 发布时间:2022-08-10
总体思路: 1、自定义django命令行功能(参考:​​https://blog.csdn.net/xujin0/article/details/88056620​​) 2、通过djaongo命令行,启动监听gerrit事件流服务,根据相应类型提取字段,入库 1、自

总体思路:

1、自定义django命令行功能(参考:​​https://blog.csdn.net/xujin0/article/details/88056620​​)

2、通过djaongo命令行,启动监听gerrit事件流服务,根据相应类型提取字段,入库


1、自定义django命令行

from django.core.management import BaseCommand, CommandError
from subscribe.gerrit.Factory import CreatePatchSetFactory, ChangeAbandoneFactory, ChangeMergeFactory, ChangeRestoreFactory, TopicChangeFactory


class Command(BaseCommand):
help = "subscribe gerrit stream-events:"

def add_arguments(self, parser):
parser.add_argument('name', help="要监听哪个事件流\nall:全部\npatchset-created:创建patchset时\nchange-merged:合入patchset时\nchange-abandoned:废弃patchset时\nchange-restored:恢复patchset时\ntopic-changed:Topic更改时")
# parser.add_argument('-s', '--subscribe', help="可选参数, -s是简写, --subscribe是全拼")

def handle(self, *args, **options):
# 1.get param
g_event = options['name']
support_events = ["patchset-created", "change-merged", "change-abandoned", "change-restored", "topic-changed"]
# 2.check if param supported
if g_event not in support_events:
raise CommandError("args wrong, support are: %s" % support_events)

if g_event == "all":
cpfactory = CreatePatchSetFactory()
cpfactory.get_stream_event()
cabfactory = ChangeAbandoneFactory()
cabfactory.get_stream_event()
cmfactory = ChangeMergeFactory()
cmfactory.get_stream_event()
crfactory = ChangeRestoreFactory()
crfactory.get_stream_event()
tpfactory = TopicChangeFactory()
tpfactory.get_stream_event()

elif g_event == "patchset-created":
cpfactory = CreatePatchSetFactory()
cpfactory.get_stream_event()

elif g_event == "change-merged":
cmfactory = ChangeMergeFactory()
cmfactory.get_stream_event()

elif g_event == "change-abandoned":
cabfactory = ChangeAbandoneFactory()
cabfactory.get_stream_event()

elif g_event == "change-restored":
crfactory = ChangeRestoreFactory()
crfactory.get_stream_event()
else:
tpfactory = TopicChangeFactory()
tpfactory.get_stream_event()

2、处理各个类型事件流数据逻辑

from cmback.settings import GERRIT_HOSTNAME, GERRIT_URL, GERRIT_PORT, G_USERNAME, G_PASSWORD
from gerrit import GerritClient
from cmapp.models import Git
from cmapp.serializers import GitSerializer
from abc import ABCMeta, abstractmethod
from django.db import close_old_connections
import sys
import paramiko
import os
import logging
import json
import datetime
import re


class GerritFactory(object):
"""
subscribe gerrit stream-events, save to local db real time for show
https://gerrit-documentation.storage.googleapis.com/Documentation/3.4.1/cmd-stream-events.html
https://gerrit-documentation.storage.googleapis.com/Documentation/3.4.1/json.html
use:
"change-abandoned"
"change-merged"
"change-restored"
"topic-changed"
"patchset-created"

unused:
"assignee-changed"
"hashtags-changed"
"ref-updated"
"reviewer-added"
"reviewer-deleted"
"comment-added"
"project-created"
"change-deleted"
"""
__metaclass__ = ABCMeta

def __init__(self):
self.hostname = GERRIT_HOSTNAME
self.gerriturl = GERRIT_URL
self.port = GERRIT_PORT
self.username = G_USERNAME
self.passwd = G_PASSWORD
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.logger = logging.getLogger("subscribe")

@abstractmethod
def get_stream_event(self):
pass

def connectGerrit(self):
try:
files = os.listdir("./subscribe/gerrit/")
for file in files:
if file.endswith("rsa"):
self.client.connect(hostname=self.hostname, port=self.port, username=self.username, pkey=paramiko.RSAKey(filename="./subscribe/gerrit/%s" % file))
self.channel = self.client.get_transport().open_session(timeout=5)
self.logger.info("connect to gerrit ok")
break
except Exception as e:
self.logger.error("connect to gerrit failed, %s" % e)
sys.exit(1)

def getIssues(self, data):
tmplist = re.findall("Issue: (.*)?", data)
if len(tmplist) == 1 and len(tmplist[0]) >16:
return tmplist[0].split(" ")
else :
return tmplist

def getChangeLines(self, changeid):
Gclient = GerritClient(base_url=self.gerriturl, username=self.username, password=self.passwd, ssl_verify=False)
res = Gclient.changes.get(changeid).get_revision("current").files.poll()
change_line = 0
for value in res:
if value.get('path') == "/COMMIT_MSG":
continue
insert = value.get("lines_inserted") if value.get("lines_inserted") is not None else 0
delete = value.get("lines_deleted") if value.get("lines_deleted") is not None else 0
change_line += max(insert, delete)
return change_line


class CreatePatchSetFactory(GerritFactory):

def get_stream_event(self):
# 1.connect
self.connectGerrit()
# 2.listen
try:
self.channel.exec_command("gerrit stream-events -s patchset-created")
while True:
if self.channel.exit_status_ready():
break
streamdata = self.channel.recv(1024000)
data = json.loads(streamdata)
self.logger.info("patchset-created: \n%s" % data)

changeid = data['change']['id']
numberid = data['change']['number']
new_revision = data['patchSet']['revision']
repo = data['change']['project']
branch = data['change']['branch']
parent = "" if len(data['patchSet']['parents']) == 0 else data['patchSet']['parents'][0]
subject = data['change']['subject']
topic = data['change'].get("topic", None)
status = data['change']['status']
commit_message = data['change']['commitMessage']
issuelist = self.getIssues(commit_message)
issue_list = json.dumps(issuelist)
author = data['change']['owner']['username']
commit_date = datetime.datetime.fromtimestamp(data['change']['createdOn']).strftime("%Y-%m-%d %H:%M:%S")
update_date = datetime.datetime.fromtimestamp(data['eventCreatedOn']).strftime("%Y-%m-%d %H:%M:%S")
change_line = self.getChangeLines(changeid)

# 1.check numberid exist?
close_old_connections()
numstatus = Git.objects.filter(numberid=numberid).exists()
if numstatus:
# update
upobj = Git.objects.get(numberid=numberid)
upobj.new_revision = new_revision
upobj.subject = subject
upobj.update_date = update_date
upobj.commit_message = commit_message
upobj.status = status
upobj.issue_list = issue_list
upobj.change_line = change_line
upobj.save()
self.logger.info("patchset-created-db: %s %s update ok" % (numberid, changeid))
else:
# create
new_data = {
"changeid": changeid,
"numberid": numberid,
"new_revision": new_revision,
"repo": repo,
"branch": branch,
"parent": parent,
"topic": topic,
"subject": subject,
"old_subject": "xxxxxx",
"status": status,
"commiter": "xxxxxx",
"reviewer": "xxxxxx",
"update_date": update_date,
"issue_list": issue_list,
"commit_message": commit_message,
"author": author,
"commit_date": commit_date,
"change_line": change_line,
"submit_date": None,
"submitter": None,
"update_msg": None
}
pc_serial = GitSerializer(data=new_data)
if pc_serial.is_valid():
pc_serial.save()
self.logger.info("patchset-created-db: %s %s create ok" % (numberid, changeid))
else:
self.logger.error("patchset-created-db: %s %s create failed %s" % (numberid, changeid, pc_serial.errors))
self.channel.close()
except Exception as e:
self.logger.warning("listen stream event except: %s ... reconnecting" % e)


class ChangeAbandoneFactory(GerritFactory):

def get_stream_event(self):
# 1.connect
self.connectGerrit()
# 2.listen
try:
self.channel.exec_command("gerrit stream-events -s change-abandoned")
while True:
if self.channel.exit_status_ready():
break
streamdata = self.channel.recv(1024000)
data = json.loads(streamdata)
self.logger.info("change-abandoned: \n%s" % data)

changeid = data['change']['id']
numberid = data['change']['number']
update_date = datetime.datetime.fromtimestamp(data['eventCreatedOn']).strftime("%Y-%m-%d %H:%M:%S")
status = data['change']['status']

close_old_connections()
numstatus = Git.objects.filter(numberid=numberid).exists()
if numstatus:
# update
upobj = Git.objects.get(numberid=numberid)
upobj.status = status
upobj.update_date = update_date
upobj.save()
self.logger.info("change-abandoned-db: %s %s update ok" % (numberid, changeid))
else:
self.logger.error("change-abandoned-db: %s not exist in db" % numberid)
except Exception as e:
self.logger.warning("listen stream event except: %s ... reconnecting" % e)


class ChangeMergeFactory(GerritFactory):

def get_stream_event(self):
# 1.connect
self.connectGerrit()
# 2.listen
try:
self.channel.exec_command("gerrit stream-events -s change-merged")
while True:
if self.channel.exit_status_ready():
break
streamdata = self.channel.recv(1024000)
data = json.loads(streamdata)
self.logger.info("change-merged: \n%s" % data)

changeid = data['change']['id']
numberid = data['change']['number']
update_date = datetime.datetime.fromtimestamp(data['eventCreatedOn']).strftime("%Y-%m-%d %H:%M:%S")
status = data['change']['status']
submitter = data['submitter']['username']
submit_date = update_date

close_old_connections()
numstatus = Git.objects.filter(numberid=numberid).exists()
if numstatus:
# update
upobj = Git.objects.get(numberid=numberid)
upobj.status = status
upobj.update_date = update_date
upobj.submitter = submitter
upobj.submit_date = submit_date
upobj.save()
self.logger.info("change-merged-db: %s %s update ok" % (numberid, changeid))
else:
self.logger.error("change-merged-db: %s not exist in db" % numberid)
except Exception as e:
self.logger.warning("listen stream event except: %s ... reconnecting" % e)


class ChangeRestoreFactory(GerritFactory):

def get_stream_event(self):
# 1.connect
self.connectGerrit()
# 2.listen
try:
self.channel.exec_command("gerrit stream-events -s change-restored")
while True:
if self.channel.exit_status_ready():
break
streamdata = self.channel.recv(1024000)
data = json.loads(streamdata)
self.logger.info("change-restored: \n%s" % data)

changeid = data['change']['id']
numberid = data['change']['number']
update_date = datetime.datetime.fromtimestamp(data['eventCreatedOn']).strftime("%Y-%m-%d %H:%M:%S")
status = data['change']['status']

close_old_connections()
numstatus = Git.objects.filter(numberid=numberid).exists()
if numstatus:
# update
upobj = Git.objects.get(numberid=numberid)
upobj.status = status
upobj.update_date = update_date
upobj.save()
self.logger.info("change-restored-db: %s %s update ok" % (numberid, changeid))
else:
self.logger.error("change-restored: %s not exist in db" % numberid)
except Exception as e:
self.logger.warning("listen stream event except: %s ... reconnecting" % e)


class TopicChangeFactory(GerritFactory):

def get_stream_event(self):
# 1.connect
self.connectGerrit()
# 2.listen
try:
self.channel.exec_command("gerrit stream-events -s topic-changed")
while True:
if self.channel.exit_status_ready():
break
streamdata = self.channel.recv(1024000)
data = json.loads(streamdata)
self.logger.info("topic-change: \n%s" % data)

numberid = data['change']['number']
update_date = datetime.datetime.fromtimestamp(data['eventCreatedOn']).strftime("%Y-%m-%d %H:%M:%S")
topic = data['change'].get("topic", None)

close_old_connections()
numstatus = Git.objects.filter(numberid=numberid).exists()
if numstatus:
# update
upobj = Git.objects.get(numberid=numberid)
upobj.topic = topic
upobj.update_date = update_date
upobj.save()
self.logger.info("topic-change-db: %s update ok" % numberid)
else:
self.logger.error("topic-change: %s not exist in db" % numberid)
self.channel.close()
except Exception as e:
# reconnect
self.logger.warning("listen stream event except: %s ... reconnecting" % e)

3、启动服务

# 查看命令参数
python manage.py subscribe -h

# 启动单个服务
python manage.py subscribe "patchset-created"

# 或者启动所有服务
python manage.py subscribe all


【转自:韩国lg机房 http://www.558idc.com/lg.html欢迎留下您的宝贵建议】
上一篇:django订阅jira webhook事件数据
下一篇:没有了
网友评论