pymsql是Python中操作MySQL的模块,其使用方法和MySQLdb几乎相同。但目前pymysql支持python3.x而后者不支持3.x版本。 本文测试python版本:2.7.11。mysql版本:5.6.24 一、安装 pip3 install pymysql 二、使
pymsql是Python中操作MySQL的模块,其使用方法和MySQLdb几乎相同。但目前pymysql支持python3.x而后者不支持3.x版本。
本文测试python版本:2.7.11。mysql版本:5.6.24
一、安装
pip3 install pymysql二、使用操作
#!/usr/bin/env pytho# -*- coding:utf-8 -*-
import pymysql
# 创建连接
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1', charset='utf8')
# 创建游标
cursor = conn.cursor()
# 执行SQL,并返回收影响行数
effect_row = cursor.execute("select * from tb7")
# 执行SQL,并返回受影响行数
effect_row = cursor.execute("update tb7 set pass = '123' where nid = %s", (11,))
# 执行SQL,并返回受影响行数,执行多次
effect_row = cursor.executemany("insert into tb7(user,pass,licnese)values(%s,%s,%s)", [("u1","u1pass","11111"),("u2","u2pass","22222")])
# 提交,不然无法保存新建或者修改的数据
conn.commit()
# 关闭游标
cursor.close()
# 关闭连接
conn.close()
注意:存在中文的时候,连接需要添加charset='utf8',否则中文显示乱码。
2、获取查询数据
#! /usr/bin/env python# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
cursor = conn.cursor()
cursor.execute("select * from tb7")
# 获取剩余结果的第一行数据
row_1 = cursor.fetchone()
print row_1
# 获取剩余结果前n行数据
row_2 = cursor.fetchmany(3)
# 获取剩余结果所有数据
row_3 = cursor.fetchall()
conn.commit()
cursor.close()
conn.close()
3、获取新创建数据自增ID
可以获取到最新自增的ID,也就是最后插入的一条数据ID
#! /usr/bin/env python# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
cursor = conn.cursor()
effect_row = cursor.executemany("insert into tb7(user,pass,licnese)values(%s,%s,%s)", [("u3","u3pass","11113"),("u4","u4pass","22224")])
conn.commit()
cursor.close()
conn.close()
#获取自增id
new_id = cursor.lastrowid
print new_id
4、移动游标
操作都是靠游标,那对游标的控制也是必须的
注:在fetch数据时按照顺序进行,可以使用cursor.scroll(num,mode)来移动游标位置,如:cursor.scroll(1,mode='relative') # 相对当前位置移动
cursor.scroll(2,mode='absolute') # 相对绝对位置移动
5、fetch数据类型
关于默认获取的数据是元祖类型,如果想要或者字典类型的数据,即:
#! /usr/bin/env python# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
#游标设置为字典类型
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
cursor.execute("select * from tb7")
row_1 = cursor.fetchone()
print row_1 #{u'licnese': 213, u'user': '123', u'nid': 10, u'pass': '213'}
conn.commit()
cursor.close()
conn.close()
6、调用存储过程
a、调用无参存储过程
#! /usr/bin/env python# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
#游标设置为字典类型
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
#无参数存储过程
cursor.callproc('p2') #等价于cursor.execute("call p2()")
row_1 = cursor.fetchone()
print row_1
conn.commit()
cursor.close()
conn.close()
b、调用有参存储过程
#! /usr/bin/env python# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
cursor.callproc('p1', args=(1, 22, 3, 4))
#获取执行完存储的参数,参数@开头
cursor.execute("select @p1,@_p1_1,@_p1_2,@_p1_3") #{u'@_p1_1': 22, u'@p1': None, u'@_p1_2': 103, u'@_p1_3': 24}
row_1 = cursor.fetchone()
print row_1
conn.commit()
cursor.close()
conn.close()
三、关于pymysql防注入
1、字符串拼接查询,造成注入
正常查询语句:
#! /usr/bin/env python# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
cursor = conn.cursor()
user="u1"
passwd="u1pass"
#正常构造语句的情况
sql="select user,pass from tb7 where user='%s' and pass='%s'" % (user,passwd)
#或者
#sql=select user,pass from tb7 where user='u1' and pass='u1pass'
row_count=cursor.execute(sql)
row_1 = cursor.fetchone()
print row_count,row_1
conn.commit()
cursor.close()
conn.close()
构造注入语句:
#! /usr/bin/env python# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
cursor = conn.cursor()
user="u1' or '1'-- "
passwd="u1pass"
sql="select user,pass from tb7 where user='%s' and pass='%s'" % (user,passwd)
#拼接语句被构造成下面这样,永真条件,此时就注入成功了。因此要避免这种情况需使用pymysql提供的参数化查询。
#select user,pass from tb7 where user='u1' or '1'-- ' and pass='u1pass'
row_count=cursor.execute(sql)
row_1 = cursor.fetchone()
print row_count,row_1
conn.commit()
cursor.close()
conn.close()
2、避免注入,使用pymysql提供的参数化语句
正常参数化查询
#! /usr/bin/env python# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
cursor = conn.cursor()
user="u1"
passwd="u1pass"
#执行参数化查询
row_count=cursor.execute("select user,pass from tb7 where user=%s and pass=%s",(user,passwd))
row_1 = cursor.fetchone()
print row_count,row_1
conn.commit()
cursor.close()
conn.close()
构造注入,参数化查询注入失败。
#! /usr/bin/env python# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
cursor = conn.cursor()
user="u1' or '1'-- "
passwd="u1pass"
#执行参数化查询
row_count=cursor.execute("select user,pass from tb7 where user=%s and pass=%s",(user,passwd))
#内部执行参数化生成的SQL语句,对特殊字符进行了加\转义,避免注入语句生成。
# sql=cursor.mogrify("select user,pass from tb7 where user=%s and pass=%s",(user,passwd))
# print sql
#select user,pass from tb7 where user='u1\' or \'1\'-- ' and pass='u1pass'被转义的语句。
row_1 = cursor.fetchone()
print row_count,row_1
conn.commit()
cursor.close()
conn.close()
结论:excute执行SQL语句的时候,必须使用参数化的方式,否则必然产生SQL注入漏洞。
3、使用存mysql存储过程动态执行SQL防注入
使用MYSQL存储过程自动提供防注入,动态传入SQL到存储过程执行语句。
定义动态调用sql的存储过程:
delimiter \\DROP PROCEDURE IF EXISTS proc_sql \\
CREATE PROCEDURE proc_sql (
in nid1 INT,
in nid2 INT,
in callsql VARCHAR(255)
)
BEGIN
set @nid1 = nid1;
set @nid2 = nid2;
set @callsql = callsql;
PREPARE myprod FROM @callsql;
-- PREPARE prod FROM 'select * from tb2 where nid>? and nid<?'; 传入的值为字符串,?为占位符
-- 用@p1,和@p2填充占位符
EXECUTE myprod USING @nid1,@nid2;
DEALLOCATE prepare myprod;
END\\
delimiter ;
定义动态调用sql的存储过程
在sql中调用:
set @nid1=12;set @nid2=15;
set @callsql = 'select * from tb7 where nid>? and nid<?';
CALL proc_sql(@nid1,@nid2,@callsql)
pymsql中调用 :
#! /usr/bin/env python# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1')
cursor = conn.cursor()
mysql="select * from tb7 where nid>? and nid<?"
cursor.callproc('proc_sql', args=(11, 15, mysql))
rows = cursor.fetchall()
print rows #((12, 'u1', 'u1pass', 11111), (13, 'u2', 'u2pass', 22222), (14, 'u3', 'u3pass', 11113))
conn.commit()
cursor.close()
conn.close()
四、使用with简化连接过程
每次都连接关闭很麻烦,使用上下文管理,简化连接过程
#! /usr/bin/env python# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pymysql
import contextlib
#定义上下文管理器,连接后自动关闭连接
@contextlib.contextmanager
def mysql(host='127.0.0.1', port=3306, user='root', passwd='', db='tkq1',charset='utf8'):
conn = pymysql.connect(host=host, port=port, user=user, passwd=passwd, db=db, charset=charset)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
try:
yield cursor
finally:
conn.commit()
cursor.close()
conn.close()
# 执行sql
with mysql() as cursor:
print(cursor)
row_count = cursor.execute("select * from tb7")
row_1 = cursor.fetchone()
print row_count, row_1
五、封装pymysql连接
a.封装函数执行sql语句获取结果
# encoding=utf-8import traceback
from pymysql import *
class MysqlHelper:
def __init__(self, host='10HW173532', user='finmsg_dev', password='zxcZXC$12', db='msgrule', charset='utf8',
connect_timeout=120): # 注意这里有默认值的变量一定要放在没有默认值变量的后面
self.host = host
self.user = user
self.password = password
self.db = db
self.charset = charset
self.connect_timeout = connect_timeout
def open(self):
self.conn = connect(host=self.host, user=self.user, password=self.password, db=self.db, charset=self.charset,
connect_timeout=self.connect_timeout)
self.cursor = self.conn.cursor()
print("数据库已连接")
def close(self):
self.cursor.close()
self.conn.close()
print("数据库已关闭")
def rollback(self):
self.conn.rollback()
self.cursor.close()
self.conn.close()
print("数据库执行回滚操作")
def search_one(self, sql, params=None):
try:
self.open()
self.cursor.execute(sql, params)
result = self.cursor.fetchone()
print("查询成功")
self.close()
return result
except Exception as e:
print(e)
print("查询失败")
def search_all(self, sql, params=None): # 查询获取多个值
try:
self.open()
self.cursor.execute(sql, params)
result = self.cursor.fetchall()
print("查询成功")
self.close()
return result
except Exception as e:
print(e)
return "查询失败"
def insert(self, sql, params=None): # 添加数据
try:
self.open()
self.cursor.execute(sql, params)
try:
self.conn.commit()
print('添加成功')
self.close()
return "添加成功"
except:
self.rollback()
return "添加失败"
except Exception as e:
print(e)
return "添加失败"
def update(self, sql, params=None): # 修改数据
try:
self.open()
self.cursor.execute(sql, params)
try:
self.conn.commit()
print('修改成功')
self.close()
return "修改成功"
except:
self.rollback()
print("修改失败")
return "修改失败"
except Exception as e:
print(e)
return "修改失败"
def delete(self, sql, params=None): # 删除数据
try:
self.open()
self.cursor.execute(sql, params)
try:
self.conn.commit()
print('删除成功')
self.close()
return "删除成功"
except:
self.rollback()
print('删除失败')
return "删除失败"
except Exception as e:
print(e)
return "删除失败"
def is_connected(self): # 检查数据库是否处于连接中
"""Check if the server is alive"""
try:
self.conn.ping(reconnect=True)
print("db is connecting")
except:
traceback.print_exc()
self.conn = self.open()
print("db reconnect")
b.封装函数获取数据库中的对象
"""作用:完成数据库相关工具类封装
分析:
1. 主要方法
get_sql_one(sql)
get_sql_all(sql)
2. 辅助方法
1. 获取连接对象
2. 获取游标对象
3. 关闭游标对象方法
4. 关闭连接对象方法
"""
from config.project import DataBase
from tools.read_config import ReadConfig
import pymysql
# 新建工具类 数据库
class ReadDB:
# 定义连接对象 类方法
conn = None
def __init__(self, database): # 构造函数-->注意这里有默认值的变量一定要放在没有默认值变量的后面
self.host = ReadConfig().read_config(section=database, option="host")
# 从config.ini配置文件读取出来的option为字符串类型,这里需要将port强转为int类型
self.port = int(ReadConfig().read_config(section=database, option="port"))
self.user = ReadConfig().read_config(section=database, option="user")
self.password = ReadConfig().read_config(section=database, option="password")
self.db_name = ReadConfig().read_config(section=database, option="db_name")
self.charset = 'utf8'
self.connect_timeout = 120
# 获取连对象方法封装
def get_conn(self):
if self.conn is None:
self.conn = pymysql.connect(host=self.host,
port=self.port,
user=self.user,
password=self.password,
db=self.db_name,
charset=self.charset,
connect_timeout=self.connect_timeout)
print('数据库连接成功')
# 返回连接对象
return self.conn
# 获取游标对象方法封装
def get_cursor(self):
return self.get_conn().cursor()
# 将游标设置为字典类型
def get_dict_cursor(self):
return self.get_conn().cursor(cursor=pymysql.cursors.DictCursor)
# 关闭游标对象方法封装
@staticmethod
def close_cursor(cursor):
if cursor:
cursor.close()
# 关闭连接对象方法封装
def close_conn(self):
if self.conn:
self.conn.close()
# 注意:关闭连接对象后,对象还存在内存中,需要手工设置为None
self.conn = None
print("数据库关闭成功")
# 主要 执行方法 获取单条结果 -> 在外界调用次方法就可以完成数据相应的操作【获取的数据库单条数据的格式为python元组类型】
def get_sql_one(self, sql_statement):
# 定义游标对象及数据变量
cursor = None
data = None
try:
# 获取游标对象
cursor = self.get_cursor()
# 调用执行方法
cursor.execute(sql_statement)
# 获取结果
data = cursor.fetchone()
except Exception as e:
print("get_sql_one error:", e)
finally:
# 关闭游标对象
self.close_cursor(cursor)
# 关闭连接对象
self.close_conn()
# 返回执行结果
return data
# 主要 执行方法 获取单条结果 -> 在外界调用次方法就可以完成数据相应的操作【获取的数据库单条数据的格式为python字典类型】
def get_sql_byDict(self, sql_statement):
# 定义游标对象及数据变量
cursor = None
data = None
try:
# 获取游标对象
cursor = self.get_dict_cursor()
# 调用执行方法
cursor.execute(sql_statement)
# 获取结果
data = cursor.fetchone()
except Exception as e:
print("get_sql_one error:", e)
finally:
# 关闭游标对象
self.close_cursor(cursor)
# 关闭连接对象
self.close_conn()
# 返回执行结果
return data
# 获取指定条数的数据库结果集【获取的数据库多条数据集格式为元组嵌套元组】
def get_sql_many(self, sql_statement, n):
# 定义游标对象及数据变量
cursor = None
data = None
try:
# 获取游标对象
cursor = self.get_cursor()
# 调用执行方法
cursor.execute(sql_statement)
# 获取结果
data = cursor.fetchmany(n)
except Exception as e:
print("get_sql_one error:", e)
finally:
# 关闭游标对象
self.close_cursor(cursor)
# 关闭连接对象
self.close_conn()
# 返回执行结果
return data
# 获取 所有数据库结果集
def get_sql_all(self, sql_statement):
# 定义游标对象及数据变量
cursor = None
data = None
try:
# 获取游标对象
cursor = self.get_cursor()
# 调用执行方法
cursor.execute(sql_statement)
# 获取所有结果
data = cursor.fetchall()
except Exception as e:
print("get_sql_all error:", e)
finally:
# 关闭游标对象
self.close_cursor(cursor)
# 关闭连接对象
self.close_conn()
# 返回执行结果
return data
# 修改、删除、新增
def update_sql(self, sql_statement):
# 定义游标对象
cursor = None
effect_row = None
try:
# 获取游标对象
cursor = self.get_cursor()
# 调用执行方法,返回受影响行数
effect_row = cursor.execute(sql_statement)
# 提交事务
self.conn.commit()
except Exception as e:
# 事务回滚
self.conn.rollback()
print("get_sql_one error:", e)
finally:
# 关闭游标对象
self.close_cursor(cursor)
# 关闭连接对象
self.close_conn()
# 返回受影响行数
return effect_row
def is_connected(self): # 检查数据库是否处于连接中
"""Check if the server is alive"""
try:
self.conn.ping(reconnect=True)
print("db is connecting")
except Exception as error:
print("db connect error:%s" % error)
self.get_conn()
self.get_dict_cursor()
print("db reconnect")
if __name__ == '__main__':
sql = "SELECT * FROM bomt_clue_db.bo_clue_orig order by CREATE_TIME desc limit 1 "
print(ReadDB(database=DataBase.LeaveInformation).get_sql_byDict(sql))
去期待陌生,去拥抱惊喜。