使用python的subprocess模块实现对SVN的相关操作。
设置GitSvn类,在该类下自定义执行SVN常规操作的方法。
SVN的常规操作包括:
(1)获取SVN当前版本,通过方法get_version()实现;
(2)下载SVN指定仓库,通过方法download()实现,实际是通过调用SVN的命令行操作指令svn checkout实现的下载整个仓库功能;
(3)获取SVN某个仓库下的所有文件列表,通过方法search_dir()实现,实际是通过调用SVN的命令行操作指令svn list实现的获取仓库文件列表功能;
(4)在SVN指定位置创建新的文件夹,通过方法mkdir_command()实现,实际是通过调用SVN的命令行操作指令svn mkdir实现的创建文件夹或者目录功能;
(5)将本地仓库文件添加到SVN仓库,通过方法upload_file()实现,实际是通过调用SVN的命令行操作指令svn add实现的添加文件功能;
(6)将本地仓库已添加的文件提交SVN指定仓库,通过方法commit_command()实现,实际是通过调用SVN的命令行操作指令svn commit实现的提交文件功能;
(7)删除SVN仓库的目录,通过方法delete_url()实现,实际是通过调用SVN的命令行操作指令svn delete实现的删除目录功能;
(8)锁定SVN仓库的文件,通过方法lock_file()实现,实际是通过调用SVN的命令行操作指令svn lock实现的锁定文件功能;
(9)将SVN仓库的文件解除锁定,通过方法unlock_file()实现,实际是通过调用SVN的命令行操作指令svn unlock实现的解除文件锁定功能;
(10)查看SVN仓库文件当前状态,通过方法check_status()实现,实际是通过调用SVN的命令行操作指令svn status实现的查看文件状态功能;
(11)更新SVN仓库的某个文件,通过方法update_file()实现,实际是通过调用SVN的命令行操作指令svn up实现的更新文件功能;
GitSvn类定义如下:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: Logintern09 import os import time import subprocess from log_manage import logger rq = time.strftime('%Y%m%d', time.localtime(time.time())) file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "temp_files") tempfile = os.path.join(file_path, rq + '.log') class GitSvn(object): def __init__(self, bill_addr): self.bill_addr = bill_addr def get_version(self): cmd = "svn info %s" % self.bill_addr logger.info(cmd) output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (result, error) = output.communicate() result = result.decode(encoding="gbk") logger.info(result) error = error.decode(encoding="gbk") logger.error(error) with open(tempfile, "w") as f: f.write(result) if error.strip() != "": if error.startswith("svn: E170013:"): res = "网络异常,暂时连接不上系统,请检查网络或其他配置!" raise SystemError(res) def download(self, dir_path): with open(tempfile, "r") as f: result = f.readlines() for line in result: if line.startswith("Revision"): laster_version = int(line.split(":")[-1].strip()) logger.info(line) break cur_path = os.path.dirname(os.path.realpath(__file__)) id_path = os.path.join(cur_path, default_sys_name) cmd = "svn checkout %s %s -r r%s" % (dir_path, id_path, laster_version) logger.info(cmd) output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (result, error) = output.communicate() result = result.decode(encoding="gbk") error = error.decode(encoding="gbk") logger.info(result) logger.error(error) if error.strip() != "": if error.startswith("svn: E170013:"): res = "网络异常,暂时连接不上系统,请检查网络或其他配置!" raise SystemError(res) if (error.startswith("svn: E155004:")) or (error.startswith("svn: E155037:")): cur_path = os.path.dirname(os.path.realpath(__file__)) file_path = os.path.join(cur_path, default_sys_name) cmd = "svn cleanup" logger.info(cmd) output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=file_path) (result, error) = output.communicate() result = result.decode(encoding="gbk") error = error.decode(encoding="gbk") logger.info(result) logger.error(error) def search_dir(self): cmd = "svn list %s" % self.bill_addr logger.info(cmd) output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (result, error) = output.communicate() result = result.decode(encoding="gbk") error = error.decode(encoding="gbk") logger.info(result) logger.error(error) with open(tempfile, "w") as f: f.write(result) if error.strip() != "": if error.startswith("svn: E170013:"): res = "网络异常,暂时连接不上系统,请检查网络或其他配置!" raise SystemError(res) def mkdir_command(self, id_num): cmd = "svn mkdir %s" % id_num logger.info(cmd) cur_path = os.path.dirname(os.path.realpath(__file__)) file_path = os.path.join(cur_path, default_sys_name) output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=file_path) (result, error) = output.communicate() result = result.decode(encoding="gbk") error = error.decode(encoding="gbk") logger.info(result) logger.error(error) if error.strip() != "": if error.startswith("svn: E170013:"): res = "网络异常,暂时连接不上系统,请检查网络或其他配置!" raise SystemError(res) def upload_file(self, file_path): cmd = "svn add %s --force" % file_path root_path, file_name = os.path.split(file_path) logger.info(cmd) output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=root_path) (result, error) = output.communicate() result = result.decode(encoding="gbk") error = error.decode(encoding="gbk") logger.info(result) logger.error(error) if error.strip() != "": if error.startswith("svn: E170013:"): res = "网络异常,暂时连接不上系统,请检查网络或其他配置!" raise SystemError(res) def commit_command(self, file): # 执行了锁定的用户执行了提交操作(提交操作将自动解锁) if os.path.isfile(file): file_path, file_name = os.path.split(file) else: file_path = file cmd = "svn commit %s -m 'update_files'" % file logger.info(cmd) output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=file_path) (result, error) = output.communicate() result = result.decode(encoding="gbk") error = error.decode(encoding="gbk") logger.info(result) logger.error(error) if error.strip() != "": if error.startswith("svn: E170013:"): res = "网络异常,暂时连接不上系统,请检查网络或其他配置!" raise SystemError(res) elif error.startswith("svn: E730053:"): res = "数据上传失败,请重新提交数据!!!" raise SystemError(res) else: res = "数据上传失败,请重新提交数据!!!" raise SystemError(res) def delete_url(self, url): cmd = "svn delete %s" % url logger.info(cmd) output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (result, error) = output.communicate() result = result.decode(encoding="gbk") error = error.decode(encoding="gbk") logger.info(result) logger.error(error) if error.strip() != "": if error.startswith("svn: E170013:"): res = "网络异常,暂时连接不上系统,请检查网络或其他配置!" raise SystemError(res) def lock_file(self, file_name): cur_path = os.path.dirname(os.path.realpath(__file__)) id_path = os.path.join(cur_path, default_sys_name) file_path = os.path.join(id_path, file_name) cmd = "svn lock %s" % file_path logger.info(cmd) output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (result, error) = output.communicate() result = result.decode(encoding="gbk") error = error.decode(encoding="gbk") logger.info(result) logger.error(error) if error.strip() != "": if error.startswith("svn: E170013:"): res = "网络异常,暂时连接不上系统,请检查网络或其他配置!" raise SystemError(res) elif error.startswith("svn: warning: W160042:"): res = "系统资源已被其他用户锁定,请稍后重试!" raise SystemError(res) def unlock_file(self, file_name): # 不使用--force 参数 可以解锁被自己锁定的文集 即普通的release lock cur_path = os.path.dirname(os.path.realpath(__file__)) id_path = os.path.join(cur_path, default_sys_name) file_path = os.path.join(id_path, file_name) cmd = "svn unlock %s --force" % file_path logger.info(cmd) output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (result, error) = output.communicate() result = result.decode(encoding="gbk") error = error.decode(encoding="gbk") logger.info(result) logger.error(error) if error.strip() != "": if error.startswith("svn: E170013:"): res = "网络异常,暂时连接不上系统,请检查网络或其他配置!" raise SystemError(res) def check_status(self, file_name): # ?:不在svn的控制中;M:内容被修改;C:发生冲突;A:预定加入到版本库;K:被锁定 cur_path = os.path.dirname(os.path.realpath(__file__)) id_path = os.path.join(cur_path, default_sys_name) file_path = os.path.join(id_path, file_name) cmd = "svn status -v %s" % file_path logger.info(cmd) output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (result, error) = output.communicate() result = result.decode(encoding="gbk") error = error.decode(encoding="gbk") logger.info(result) logger.error(error) # 获取状态结果解析 status_list = result.split(" ") status_flag_list = [] for i in status_list: if i.isalpha(): status_flag_list.append(i) else: break if result.strip() != "": if error.startswith("svn: E170013:"): res = "网络异常,暂时连接不上系统,请检查网络或其他配置!" raise SystemError(res) elif "C" in status_flag_list: return "C" elif "K" in status_flag_list: return "K" def update_file(self, file_name): cur_path = os.path.dirname(os.path.realpath(__file__)) id_path = os.path.join(cur_path, default_sys_name) file_path = os.path.join(id_path, file_name) cmd = "svn up %s" % file_path logger.info(cmd) output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (result, error) = output.communicate() result = result.decode(encoding="gbk") error = error.decode(encoding="gbk") logger.info(result) logger.error(error) with open(tempfile, "w") as f: f.write(result) with open(tempfile, "r") as f: result = f.readlines() count = -1 for line in result: count += 1 if (line.strip() != "") and (count == 1): if line.startswith("C"): res = "更新系统资源时发现存在冲突,请稍后重试!" raise SystemError(res) if error.strip() != "": if error.startswith("svn: E170013:"): res = "网络异常,暂时连接不上系统,请检查网络或其他配置!" raise SystemError(res) elif (error.startswith("svn: E155037:")) or (error.startswith("svn: E155004:")): cur_path = os.path.dirname(os.path.realpath(__file__)) file_path = os.path.join(cur_path, default_sys_name) cmd = "svn cleanup" logger.info(cmd) output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=file_path) (result, error) = output.communicate() result = result.decode(encoding="gbk") error = error.decode(encoding="gbk") logger.info(result) logger.error(error)
上述类GitSvn的应用实例如下:
if __name__ == '__main__': default_url = svn:*** # 用户本地SVN客户端的URL地址 git_class = GitSvn(default_url) git_class.get_version() git_class.download() # 验证查看目录文件列表功能 git_class.search_dir() # 验证删除目录功能 cur_path = os.path.dirname(os.path.realpath(__file__)) file_path = os.path.join(cur_path, default_sys_name) id_path = os.path.join(file_path, 'SCR202202100002') git_class.delete_url(id_path) # 验证文件加锁功能 git_class.lock_file(file_name="单号总表.xlsx") # 验证文件解锁功能 git_class.unlock_file(file_name="单号总表.xlsx") # 检查文件状态 git_class.check_status(file_name="单号总表.xlsx") # 验证创建目录功能 git_class.mkdir_command("SCR202203280001") # 验证提交文件功能 cur_path = os.path.dirname(os.path.realpath(__file__)) sys_path = os.path.join(cur_path, default_sys_name) target_dir = os.path.join(sys_path, "SCR202203280001") git_class.upload_file(target_dir) git_class.commit_command(target_dir)
到此这篇关于python调用subprocess模块实现命令行操作控制SVN的文章就介绍到这了,更多相关python subprocess模块内容请搜索自由互联以前的文章或继续浏览下面的相关文章希望大家以后多多支持自由互联!