Defining models is similar to Django or SQLAlchemy 译文:定义模型类似于Django或SQLAlchemy (目录) 文档 github: https://github.com/coleifer/peewee 官方文档:http://docs.peewee-orm.com/ pypi https://pypi.org/project/peewe
Defining models is similar to Django or SQLAlchemy
译文:定义模型类似于Django或SQLAlchemy
(目录)
文档
- github: https://github.com/coleifer/peewee
- 官方文档:http://docs.peewee-orm.com/
- pypi https://pypi.org/project/peewee/
示例代码仓库
https://github.com/mouday/peewee-demo
安装
pip install peewee测试环境
$ python --version Python 3.7.0 $ pip show peewee Name: peewee Version: 3.15.31、数据库 Database
1.1、设置参数
# -*- coding: utf-8 -*- """ @File : database.py """ from peewee import SqliteDatabase import logging # 设置数据库 db = SqliteDatabase("demo.db") # 打印日志 logger = logging.getLogger('peewee') logger.addHandler(logging.StreamHandler()) logger.setLevel(logging.DEBUG) logger.propagate = False # 不向上传播1.2、连接数据库
from app.database import db # 链接数据库 db.connect() # 断开数据库 if not db.is_closed(): db.close()1.3、执行原生sql
获取多条记录
cursor = db.execute_sql("select * from tb_user where id = ?", (1,)) rows = cursor.fetchall() print(rows) [ (1, 'Jack', 23, '2022-10-19 18:09:07.038935', '2022-10-19 18:09:07.038940') ]获取单条记录
cursor = db.execute_sql("select * from tb_user where id = ?", (1,)) # 将返回结果转换为dict # https://docs.python.org/zh-cn/3.6/library/sqlite3.html#sqlite3.Connection.row_factory def dict_factory(cursor, row): """将返回结果转换为dict""" d = {} for idx, col in enumerate(cursor.description): d[col[0]] = row[idx] return d cursor.row_factory = dict_factory row = cursor.fetchone() print(row) { 'id': 1, 'name': 'Jack', 'age': 23, 'created_time': '2022-10-19 18:09:07.038935', 'update_time': '2022-10-19 18:09:07.038940' }2、模型 Model
2.1、定义模型
定义基类模型
# -*- coding: utf-8 -*- """ @File : base_model.py """ from peewee import Model from app.database import db class BaseModel(Model): """ # 基类,设置数据库链接 """ class Meta: database = db定义模型
# -*- coding: utf-8 -*- """ @File : user_model.py """ from datetime import datetime from peewee import CharField, DateTimeField, IntegerField from app.model.base_model import BaseModel class UserModel(BaseModel): """ 用户表 """ id = IntegerField(primary_key=True) name = CharField(null=False) age = IntegerField(null=False) created_time = DateTimeField(default=datetime.now) update_time = DateTimeField(default=datetime.now) class Meta: # 指定表名 table_name = 'tb_user'2.2、表操作
建表
UserModel.create_table() ( 'CREATE TABLE IF NOT EXISTS "tb_user" ( "id" INTEGER NOT NULL PRIMARY KEY, "name" VARCHAR(255) NOT NULL, "age" INTEGER NOT NULL, "created_time" DATETIME NOT NULL, "update_time" DATETIME NOT NULL)', [] )查看表是否存在
UserModel.table_exists() ( 'SELECT name FROM "main".sqlite_master WHERE type=? ORDER BY name', ('table',) )删除表
UserModel.drop_table() ( 'DROP TABLE IF EXISTS "tb_user"', [] )3、模型的CURD操作
3.1、写入操作
插入数据
ret = UserModel.insert({ UserModel.age: 20, UserModel.name: 'Tom' }).execute() 'INSERT INTO "tb_user" ("name", "age", "created_time", "update_time") VALUES (?, ?, ?, ?)', [ 'Tom', 20, datetime.datetime(2022, 10, 19, 17, 28, 30, 198981), datetime.datetime(2022, 10, 19, 17, 28, 30, 198988) ]插入字典数据
ret = UserModel.insert({ 'age': 20, 'name': 'Tom' }).execute() 'INSERT INTO "tb_user" ("name", "age", "created_time", "update_time") VALUES (?, ?, ?, ?)', [ 'Tom', 20, datetime.datetime(2022, 10, 19, 17, 28, 30, 198981), datetime.datetime(2022, 10, 19, 17, 28, 30, 198988) ]保存实例
user = UserModel( age=21, name='Tom' ) user.save() ('INSERT INTO "tb_user" ("name", "age", "created_time", "update_time") VALUES (?, ?, ?, ?)', ['Charlie', 12, datetime.datetime(2022, 10, 19, 17, 34, 43, 376650), datetime.datetime(2022, 10, 19, 17, 34, 43, 376652)])插入并创建实例
user = UserModel.create( age=22, name='Tom' ) ('INSERT INTO "tb_user" ("name", "age", "created_time", "update_time") VALUES (?, ?, ?, ?)', ['Charlie', 12, datetime.datetime(2022, 10, 19, 17, 36, 16, 408224), datetime.datetime(2022, 10, 19, 17, 36, 16, 408226)])插入多条数据
UserModel.insert_many([ { 'age': 23, 'name': 'Tom' }, { 'age': 24, 'name': 'Tom' } ]).execute() ('INSERT INTO "tb_user" ("name", "age", "created_time", "update_time") VALUES (?, ?, ?, ?), (?, ?, ?, ?)', [ 'Tom', 23, datetime.datetime(2022, 10, 19, 17, 38, 48, 106336), datetime.datetime(2022, 10, 19, 17, 38, 48, 106344), 'Tom', 24, datetime.datetime(2022, 10, 19, 17, 38, 48, 106355), datetime.datetime(2022, 10, 19, 17, 38, 48, 106360)])3.2、更新数据
更新多条数据
UserModel.update( name='Jack' ).where( UserModel.id == 1 ).execute() ('UPDATE "tb_user" SET "name" = ? WHERE ("tb_user"."id" = ?)', ['Jack', 1])更新单条数据
UserModel.set_by_id(1, {'name': 'Jack'}) ('UPDATE "tb_user" SET "name" = ? WHERE ("tb_user"."id" = ?)', ['Jack', 1])3.3、删除数据
按照主键删除
UserModel.delete_by_id(1) ('DELETE FROM "tb_user" WHERE ("tb_user"."id" = ?)', [1])按条件删除
UserModel.delete().where( UserModel.id == 1 ).execute() ('DELETE FROM "tb_user" WHERE ("tb_user"."id" = ?)', [1])删除实例
user = UserModel.get_by_id(1) user.delete_instance() ('DELETE FROM "tb_user" WHERE ("tb_user"."id" = ?)', [1])清空表数据
UserModel.truncate_table() ('DELETE FROM "tb_user"', [])3.4、取单条数据
条件查询一条
row = UserModel.select().where( UserModel.name == 'Tom' ).get() print(row) ('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" FROM "tb_user" AS "t1" WHERE ("t1"."name" = ?) LIMIT ? OFFSET ?', ['Tom', 1, 0])获取第一条
row = UserModel.select().where( UserModel.name == 'Tom' ).first() print(row) ('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" FROM "tb_user" AS "t1" WHERE ("t1"."name" = ?) LIMIT ?', ['Tom', 1])通过获取,不存在报错
row = UserModel.get(UserModel.name == 'Tom') print(row) ('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" FROM "tb_user" AS "t1" WHERE ("t1"."name" = ?) LIMIT ? OFFSET ?', ['Tom', 1, 0])通过获取或者返回None
user = UserModel.get_or_none(UserModel.name == 'Jack') print(user) ('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" FROM "tb_user" AS "t1" WHERE ("t1"."name" = ?) LIMIT ? OFFSET ?', ['Jack', 1, 0])通过主键获取,不存在报错
user = UserModel.get_by_id(1) print(user) ('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" FROM "tb_user" AS "t1" WHERE ("t1"."id" = ?) LIMIT ? OFFSET ?', [1, 1, 0])获取或创建
UserModel.get_or_create(name='Tom', age=23) ('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" FROM "tb_user" AS "t1" WHERE (("t1"."name" = ?) AND ("t1"."age" = ?)) LIMIT ? OFFSET ?', ['Tom', 23, 1, 0]) ('BEGIN', None) ('INSERT INTO "tb_user" ("name", "age", "created_time", "update_time") VALUES (?, ?, ?, ?)', ['Tom', 23, datetime.datetime(2022, 10, 19, 18, 9, 7, 38935), datetime.datetime(2022, 10, 19, 18, 9, 7, 38940)])3.5、取多条数据
查询多条记录
# 注意,获取的是 iterator # 可以转为 namedtuples(), tuples(), dicts() query = UserModel.select().where( UserModel.name == 'Tom' ) print(list(query)) ('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" FROM "tb_user" AS "t1" WHERE ("t1"."name" = ?)', ['Tom'])排序
query = UserModel.select().where( UserModel.name == 'Tom' ).order_by(UserModel.age.desc()) print(list(query)) # [<UserModel: 1>] ('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" FROM "tb_user" AS "t1" WHERE ("t1"."name" = ?) ORDER BY "t1"."age" DESC', ['Tom'])分页
query = UserModel.select().paginate(2, 10) print(list(query)) ('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" FROM "tb_user" AS "t1" LIMIT ? OFFSET ?', [10, 10])统计
query = UserModel.select().count() print(list(query)) ('SELECT COUNT(1) FROM (SELECT 1 FROM "tb_user" AS "t1") AS "_wrapped"', [])分组
query = UserModel.select().group_by(UserModel.name) print(list(query)) ('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" FROM "tb_user" AS "t1" GROUP BY "t1"."name"', [])