python多线程并发操作数据库,会存在链接数据库超时、数据库连接丢失、数据库操作超时等问题 解决方法:使用数据库连接池,并且每次操作都从数据库连接池获取数据库操作句柄,操
python多线程并发操作数据库,会存在链接数据库超时、数据库连接丢失、数据库操作超时等问题
解决方法:使用数据库连接池,并且每次操作都从数据库连接池获取数据库操作句柄,操作完关闭连接返回数据库连接池
如果不使用数据库连接池, 直接使用多线程去操作数据库, 会遇到资源竞争, 争夺cursor游标, Thread对象的Lock和Rlock可以实现简单的线程同步,对cursor进行加锁,但是这个行为,反其道而行。 经测试加锁还不如不用多线程,而且执行速度会比正常慢
用于创建数据库连接池, 具体可以参考一下代码
import pymysqlfrom DBUtils.PooledDB import PooledDB
import traceback
from threading import Thread
# 基础类
class Base:
"""
用于连接和关闭
"""
def __init__(self):
self.pool = self.create_pool()
def create_pool(self):
"""
创建数据库连接池
:return: 连接池
"""
pool = PooledDB(creator=pymysql,
maxconnections=0, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=4, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=0, # 链接池中最多闲置的链接,0和None不限制
maxusage=1, # 一个链接最多被重复使用的次数,None表示无限制
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
host='127.0.0.1', # 此处必须是是127.0.0.1
port=3306,
user='root',
passwd='123456',
db='localhost',
use_unicode=True,
charset='utf8')
return pool
def save_mysql(self, sql, args):
"""
保存数据库
:param sql: 执行sql语句
:param args: 添加的sql语句的参数 list[tuple]
"""
try:
db = self.pool.connection() # 连接数据池
cursor = db.cursor() # 获取游标
cursor.executemany(sql, args)
db.commit()
cursor.close()
db.close()
except:
traceback.print_exc()
# 插入数据
def insertdata(self, insert, data):
args = [(id, name) for id, name in data if id and name]
self.save_mysql(insert, args)
def save_mysql(self, sql, args): 关键就在于这个方法,创建出连接池, 此时多线程每次写入需要连接池子,要从连接池中拿出连接,获取游标进行操作
下面代码 可以是同一个类, 也可以再你的业务代码中, 具体看个人 简单实现一下伪代码
分别测试了两次数据, 执行速度确实有了很大的提升
@util.fn_timerdef find_all_done(self):
# 运行时间 0:00:06.695223 运行时间 0:00:10.402396
insert1 = "INSERT INTO XXX(_id, title) VALUES (%s, %s)"
# 执行的插入方法
self.insertdata(insert=insert1, data=result_all_chinese)
insert2 = "INSERT INTO XXX(_id, title) VALUES (%s, %s)"
# 执行的插入方法
self.insertdata(insert=insert2, data=result_non_chinese)
# 执行多线程 运行时间 0:00:01.593220 运行时间 0:00:03.004854
try:
insert1 = "INSERT INTO xxx(_id, title) VALUES (%s, %s)"
insert2 = "INSERT INTO xxx(_id, title) VALUES (%s, %s)"
# target参数是线程运行方法名
t1 = Thread(target=self.insertdata, args=(insert1, result_all_chinese))
t2 = Thread(target=self.insertdata, args=(insert2, result_non_chinese))
t1.start()
t2.start()
except:
traceback.print_exc()