- Flask常用组件
- 一、 flask_session
- 1、 常用配置
- 2、 使用方法
- 2.1 session_interface
- 2.2 config
- 二、 DBUtils
- 1、 引言
- 2、 DBUtils
- 2.1 模式一
- 2.2 模式二
- 3、 代码封装
- 4、 结合flask使用
- 4.1 方式一
- 4.2 方式二
- 4.3 方法三
- 三、 wtforms
- 1、 支持的字段
- 2、 字段参数
- 3、 验证函数
- 4、 代码实例
- 四、 flask_mail
- 1、 简介
- 2、 Mail
- 3、 Massage
- 3.1 实例化对象
- 3.2 类方法
- 4、 使用方法
- 五、 flask_script
- 1、 简介
- 2、 启动服务
- 3、 传入参数
- 六、 flask_sqlalchemy
- 1、 简介
- 2、 使用步骤
- 七、 flask_migrate
- 八、 自定义组件
- 九、 其它
- 1、 多app应用
- 2、 信号
- 一、 flask_session
session
是基于cookie实现, 保存在服务端的键值对(形式为 {随机字符串:'uuid'}
), 同时在浏览器中的cookie中也对应一相同的随机字符串,用来再次请求的 时候验证,这个组件的作用是将session数据存储到数据库中
# 对 session 进行保护
SECRET_KEY = os.urandom(32) # 创建密钥
SESSION_USE_SIGNER = True # 是否签名保护
PERMANENT_SESSION_LIFETIME = timedelta(minutes=30) # 设置时间延迟
SESSION_REFRESH_EACH_REQUEST = True # 每刷新一次就更新一次session
SESSION_TYPE = "redis" # 设置redis存储session数据
SESSION_REDIS = Redis("127.0.0.1") # 连接数据库
SESSION_PREFIX = "MyWebPrefix:" # 设置session前缀,默认为session:
2、 使用方法 2.1 session_interface注意:
session
中存储的是字典,当修改字典内部(第二级)元素时,会造成数据不更新解决方法:
我们要先阅读一下部分源码
class SecureCookieSession(CallbackDict, SessionMixin): #: When data is changed, this is set to ``True``. Only the session modified = False # 只有当session里面的数据改变的时候,这个才会被设置成True #: When data is read or written, this is set to ``True``. Used by accessed = False def __init__(self, initial: t.Any = None) -> None: def on_update(self) -> None: self.modified = True # 如果数据更新,就会被改为True,后面就会对session设置到cookies中 """ if not (session.modified # 如果session修改,就会重新设置到cookies中 or ( session.permanent # 是否设置到cookie里面 and app.config["SESSION_REFRESH_EACH_REQUEST"] # 如果为True,则每次请求刷新一次 )): return ... response.set_cookie(...) """ self.accessed = True # 如果session已读,则将session写入response中 """ if session.accessed: response.vary.add("Cookie") """ # 将on_update()传递给CallbackDict super().__init__(initial, on_update)
解决方法:
方法一
session.modified = True # 手动使得session标记为session的内容被修改
方法二
app.config["SESSION_REFRESH_EACH_REQUEST"] = True # 每刷新一次就更新 # 同时,在登录成功之后,设置 session.permanent = True # 默认为 False,但是如果使用redis的话,就不需要设置,其默认为True
通过session_interface
来设置
from flask_session import RedisSessionInterface
from flask import Flask
from os import urandom
from redis import Redis
app = Flask(__name__)
app.serect_key = urandom(32) # 设置32位随机密钥
app.config["SESSION_USE_SIGNER"] = True
# 通过redis保存session
app.session_interface = RedisSessionInterface(
redis=Redis("127.0.0.1"), # 连接Redis数据库
key_prefix="flask_login", # 设置随机字符串的前缀,默认为session:
)
2.2 config默认session设置为
from flask.sessions import SecureCookieSessionInterface app.session_interface = SecureCookieSessionInterface()
修改后,就可以直接使用了,其使用方法和原先的一样
通过配置文件来设置
from flask_session import Session
from flask import Flask
from os import urandom
from redis import Redis
settings = {
"SESSION_TYPE": "redis", # 使用redis数据库连接
"SESSION_REDIS": Redis("127.0.0.1"), # 连接redis数据库
"SECRET_KEY": os.urandom(32), # 创建密钥
"SESSION_USE_SIGNER": True, # 是否签名保护
"SESSION_REFRESH_EACH_REQUEST": True, # 每刷新一次就更新数据
}
app.config.from_mapping(settings) # 设置配置
Session(app) # 内部封装了设置app.session_interface的方法
二、 DBUtils
1、 引言
当我们要对数据库进行操作时,可以这么干:
import pymysql
from functools import wraps
SQL_CONFIG = {
# 对数据库的配置
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"passwd": "qwe123",
"db": "flask1",
}
def sql_outer(fun):
"""使用装饰器,可以对数据库便捷操作"""
@wraps(fun)
def inner(sql, *args):
conn = pymysql.connect(**SQL_CONFIG) # 连接数据库
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 返回字典类型的SQL数据
obj = fun(sql, cursor, *args) # 将参数传入函数中,进行运行
conn.commit() # 提交事务
cursor.close() # 关闭游标
conn.close() # 关闭连接
return obj # 返回数据
return inner
class SQLTool:
@staticmethod
@sql_outer
def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
# print(*args)
cursor.execute(sql, *args) # 注意不能使用字符串的拼接,防止SQL注入
obj = cursor.fetchall() # 获取全部数据
return obj
@staticmethod
@sql_outer
def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
cursor.execute(sql, *args) # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
obj = cursor.fetchone() # 获取第一条数据
return obj
if __name__ == '__main__':
obj = SQLTool.fetch_one("SELECT id, name FROM users WHERE name=%s and pwd=%s", ["kun", "123"])
print(obj)
2、 DBUtils问题来了,如果有很多连接的话,开启数据库再关闭是不是很麻烦呢?
- 我们可以使用数据库连接池:
DBUtils
使用数据库连接池
此连接池有两种模式:
- 为每一个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把连接重新发到连接池,供自己线程再次使用。当线程终止,连接自动关闭
- 创建一批连接到连接池,供所有线程共享使用(主要)
POOL = PersistentDB(
creator=pymysql, # 使用链接数据库的模块
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
closeable=False,
# 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='pooldb',
charset='utf8'
)
def func():
conn = POOL.connection(shareable=False)
cursor = conn.cursor()
cursor.execute('select * from tb1')
result = cursor.fetchall()
cursor.close()
conn.close()
func()
2.2 模式二
import time
import pymysql
import threading
from dbutils.pooled_db import PooledDB, SharedDBConnection
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='pooldb',
charset='utf8'
)
def func():
# 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
# 否则
# 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
# 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
# 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
# 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
conn = POOL.connection()
# print(th, '链接被拿走了', conn1._con)
# print(th, '池子里目前有', pool._idle_cache, '\r\n')
cursor = conn.cursor()
cursor.execute('select * from tb1')
result = cursor.fetchall()
conn.close()
func()
3、 代码封装
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author: A.L.Kun
# @file : sql.py
# @time : 2022/6/5 0:29
import pymysql
from functools import wraps
# 使用单例模式创建一个数据库连接池
from dbutils.pooled_db import PooledDB
import pymysql
SQL_CONFIG = {
# 对数据库的配置
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"passwd": "qwe123",
"db": "flask1",
"charset": "utf8",
}
POOL_CONFIG = {
# 对数据库连接池的配置
"maxconnections": 6,
"mincached": 2,
"maxcached": 5,
"maxshared": 3,
"blocking": True,
"maxusage": None,
"setsession": [],
"ping": 0,
}
POOL = PooledDB(
creator=pymysql,
**SQL_CONFIG,
**POOL_CONFIG
)
def select_sql(type_):
def sql_outer(fun):
"""使用装饰器,可以对数据库便捷操作"""
@wraps(fun)
def inner(sql, *args):
if type_ == "sql":
"""如果通过数据库来获取值的话"""
conn = pymysql.connect(**SQL_CONFIG) # 连接数据库
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 返回字典类型的SQL数据
obj = fun(sql, cursor, *args) # 将参数传入函数中,进行运行
conn.commit() # 提交事务
cursor.close() # 关闭游标
conn.close() # 关闭连接
return obj # 返回数据
elif type_ == "pool":
# 通过数据库连接池取值
conn = POOL.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
obj = fun(sql, cursor, *args)
conn.commit()
cursor.close() # 关闭游标
conn.close() # 关闭连接
return obj
raise ValueError("type_ value error, value = pool or sql")
return inner
return sql_outer
class SQLTool:
"""使用pymysql单线程连接"""
@staticmethod
@select_sql("sql")
def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
# print(*args)
cursor.execute(sql, *args) # 注意不能使用字符串的拼接,防止SQL注入
obj = cursor.fetchall() # 获取全部数据
return obj
@staticmethod
@select_sql("sql")
def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
cursor.execute(sql, *args) # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
obj = cursor.fetchone() # 获取第一条数据
return obj
class DBUtilsTool:
"""使用数据库连接池"""
@staticmethod
@select_sql("pool")
def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
# print(*args)
cursor.execute(sql, *args) # 注意不能使用字符串的拼接,防止SQL注入
obj = cursor.fetchall() # 获取全部数据
return obj
@staticmethod
@select_sql("pool")
def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
cursor.execute(sql, *args) # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
obj = cursor.fetchone() # 获取第一条数据
return obj
obj = DBUtilsTool.fetch_one("SELECT id, name FROM users WHERE name=%s and pwd=%s", ["kun", "123"])
print(obj)
4、 结合flask使用
4.1 方式一
无法创建的原因是flask的上下文管理机制,当flask
还没run()
的时候,无法访问current_app
里面的配置信息
将配置信息放到settings
配置文件中,再使用init_app
方法——模拟Session(app)
,把数据库连接池放到配置文件config
里面
在存放数据库连接池的文件sql.py
中:
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author: A.L.Kun
# @file : sql.py
# @time : 2022/6/5 0:29
import pymysql
from functools import wraps
from flask import current_app
from dbutils.pooled_db import PooledDB
def sql_outer(fun):
"""使用装饰器,可以对数据库便捷操作"""
@wraps(fun)
def inner(sql, *args):
# 通过数据库连接池取值
POOL = current_app.config["SQL_POOL"] # 将其存储到配置文件中,防止连接池不存在
conn = POOL.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
obj = fun(sql, cursor, *args)
conn.commit()
cursor.close() # 关闭游标
conn.close() # 关闭连接
return obj
return inner
class DBUtilsTool:
"""使用数据库连接池,模拟Session(app)"""
def __init__(self, app=None):
self.app = app
if app:
self.init_app(app)
def init_app(self, app):
POOL = PooledDB(
**app.config["POOL_CONFIG"]
)
app.config["SQL_POOL"] = POOL
@staticmethod
@sql_outer
def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
# print(*args)
cursor.execute(sql, *args) # 注意不能使用字符串的拼接,防止SQL注入
obj = cursor.fetchall() # 获取全部数据
return obj
@staticmethod
@sql_outer
def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
cursor.execute(sql, *args) # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
obj = cursor.fetchone() # 获取第一条数据
return obj
在settings.py
文件中:
import pymysql
class BaseConfig:
POOL_CONFIG = {
# 对数据库连接池的配置
"creator": pymysql,
"maxconnections": 6,
"mincached": 2,
"maxcached": 5,
"maxshared": 3,
"blocking": True,
"maxusage": None,
"setsession": [],
"ping": 4,
# 对数据库的配置
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"passwd": "qwe123",
"db": "flask1",
"charset": "utf8",
}
在manage.py
主运行程序中
from sql import DBUtilsTool
from flask import Flask
def create_app():
app = Flask(__name__)
app.config.from_object("settings.BaseConfig")
# 模拟session_redis 将app传入,初始化数据库连接池
DBUtilsTool(app) # 在程序刚启动的时候,flask还没开始运行,就把配置文件放到其中,flask开启的时候,就已经将数据库连接池存储到了config中
return app
if __name__ == '__main__':
app = create_app()
app.run("0.0.0.0")
4.2 方式二
直接将数据库连接池添加到配置文件中
在sql.py
中
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author: A.L.Kun
# @file : sql.py
# @time : 2022/6/5 0:29
import pymysql
from functools import wraps
from flask import current_app
def sql_outer(fun):
"""使用装饰器,可以对数据库便捷操作"""
@wraps(fun)
def inner(sql, *args):
# 通过数据库连接池取值
POOL = current_app.config["SQL_POOL"] # 将其存储到配置文件中,防止连接池不存在
conn = POOL.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
obj = fun(sql, cursor, *args)
conn.commit()
cursor.close() # 关闭游标
conn.close() # 关闭连接
return obj
return inner
class DBUtilsTool:
"""使用数据库连接池"""
@staticmethod
@sql_outer
def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
# print(*args)
cursor.execute(sql, *args) # 注意不能使用字符串的拼接,防止SQL注入
obj = cursor.fetchall() # 获取全部数据
return obj
@staticmethod
@sql_outer
def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
cursor.execute(sql, *args) # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
obj = cursor.fetchone() # 获取第一条数据
return obj
在settings.py
中
from dbutils.pooled_db import PooledDB
class BaseConfig:
SQL_POOL = PooledDB(
# 对数据库连接池的配置
"creator": pymysql,
"maxconnections": 6,
"mincached": 2,
"maxcached": 5,
"maxshared": 3,
"blocking": True,
"maxusage": None,
"setsession": [],
"ping": 4,
# 对数据库的配置
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"passwd": "qwe123",
"db": "flask1",
"charset": "utf8",
)
在manage.py
中
from sql import DBUtilsTool
from flask import Flask
def create_app():
app = Flask(__name__)
app.config.from_object("settings.BaseConfig")
return app
if __name__ == '__main__':
app = create_app()
app.run("0.0.0.0")
4.3 方法三不建议使用方法二,其没有做到配置与程序分离的效果
使用pool.py
在里面创建POOL
from flask import current_app
POOL = PooledDB(
**current_app.config["POOL_CONFIG"]
)
在sql.py
中
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author: A.L.Kun
# @file : sql.py
# @time : 2022/6/5 0:29
import pymysql
from functools import wraps
# from .pool import POOL # 注意不能在这里导入,因为app创建时,注册蓝图的时候,app并没有运行,故current_app里面没有配置信息
def sql_outer(fun):
"""使用装饰器,可以对数据库便捷操作"""
@wraps(fun)
def inner(sql, *args):
# 通过数据库连接池取值
from .pool import POOL # 但路由中调用的时候导入,app肯定运行起来了,故current_app里面肯定有配置信息
conn = POOL.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
obj = fun(sql, cursor, *args)
conn.commit()
cursor.close() # 关闭游标
conn.close() # 关闭连接
return obj
return inner
class DBUtilsTool:
"""使用数据库连接池"""
@staticmethod
@sql_outer
def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
# print(*args)
cursor.execute(sql, *args) # 注意不能使用字符串的拼接,防止SQL注入
obj = cursor.fetchall() # 获取全部数据
return obj
@staticmethod
@sql_outer
def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
cursor.execute(sql, *args) # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
obj = cursor.fetchone() # 获取第一条数据
return obj
要注意
current_app
的创建时期,其他的使用方法都一样
只要写原生SQL,就要使用数据库连接池
三、 wtforms作用:专门用于对python web
框架做表单验证
可以根据验证函数的源码,来自定义验证函数
4、 代码实例创建一个上传文件功能的app
,里面包含了对其工作流程的关键步骤分析,希望对理解有用
代码实例:
在app.py
中
from flask import Flask
from wtforms import FileField, Form, SubmitField, widgets, validators
from flask import (request, render_template, current_app)
from werkzeug.utils import secure_filename
import os
class UploadForm(Form):
"""创建上传文件的字段"""
# UploadForm.checkbox = UnboundField(BooleanField, *args, **kwargs, creation_counter=1)
file1 = FileField(
render_kw={"class": "file"},
widget=widgets.FileInput(),
validators=[
validators.DataRequired(message="请选择要上传的文件哦!")
]
)
# UploadForm.sbtn = UnboundField(SubmitField, *args, **kwargs, creation_counter=2)
sbtn = SubmitField(
render_kw={
"id": "upload_submit",
"value": "上传文件",
},
widget=widgets.SubmitInput(),
)
config = {
"UPLOAD_FOLDER": "upload/", # 设置存储文件的文件夹
}
app = Flask(__name__)
app.config.from_mapping(config)
@app.route("/upload", methods=['GET', 'POST'])
def upload():
# 用于专门处理提交的数据的路由
if request.method == "POST":
form = UploadForm(formdata=request.files) # 上传文件
if form.validate(): # 对数据进行验证
f = form.file1.data
f.save(os.path.join(current_app.config['UPLOAD_FOLDER'], secure_filename(f.filename))) # 将文件名称安全处理
return render_template("index.html", form=form, msg=f"上传文件成功!上传的文件为{f.filename}")
else:
return render_template("index.html", form=form, msg="请选择文件哦!")
form = UploadForm()
"""
# 对该类创建时的源码分析
metaclass=FormMeta # 其为元类
# 故,创建对象时
1、 FormMeta.__call__
# 在 __call__ 方法里面进行的步骤
UploadForm._unbound_fields = None
UploadForm._wtforms_meta = None
_unbound_fields = [ # 其根据counter来排序
("checkbox": UnboundField(BooleanField, *args, **kwargs, creation_counter=1))
("sbtn": UnboundField(SubmitField, *args, **kwargs, creation_counter=1))
]
_wtforms_meta = type("Meta", tuple([DefaultMeta]), {}) # DefaultMeta = Form.Meta
= class Meta(DefaultMeta):
pass
2、 UploadForm.__new__
# 在 __new__ 方法里面进行的步骤,然后发现没有 __new__ 方法,除非自定义
pass
3、 UploadForm.__init__
# 执行 __init__ 方法
UploadForm()._fields = {
"file1": FileField(...),
"sbtn": SubmitField(...)
}
UploadForm().name=FileField(...)
UploadForm().sbtn=SubmitField(...)
"""
print(form.file1)
"""
# 访问类的属性时的源码分析
# 故,form.name 执行的是字段类中的 __str__ 方法
Field.__str__ -> return self() # 执行字段的 __call__ 方法
Field.__call__ -> return self.meta.render_field(self, kwargs)
DefaultMeta.render_field(self, kwargs) -> return field.widget(field, **render_kw) # 执行 widget 的 __call__ 方法
Input.__call__ -> return Markup("<input %s>" % self.html_params(name=field.name, **kwargs)) # 进行渲染
"""
for i in form:
print(i) # 发现i是可以遍历的,
"""
# 其内部有一个iter方法
BaseField.__iter__ -> iter(self._fields.values())
print(i) # 执行字段内部的 __str__ 方法
"""
return render_template("index.html", form=form)
if __name__ == '__main__':
app.run()
还有验证流程,可以自己尝试一下
在templates/index.html
中
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>文件上传</title>
</head>
<body>
<div class="uploadFile">
<h1>请选择要上传的文件</h1>
<form method="POST" enctype="multipart/form-data">
<p>{{ form.file1 }}</p>
<p style="color: red">{{ form.file1.errors[0] }}</p>
<p>{{ form.sbtn }}</p>
</form>
<p style="color: green">{{ msg }}</p>
</div>
</body>
</html>
四、 flask_mail
1、 简介
在web程序中,经常会需要发送电子邮件。比如,在用户注册账户时发送确认邮件;定期向用户发送热门内容或是促销信息等等。在Web程序中发送电子邮件并不复杂,借助扩展Flask-Mail或是第三方邮件服务,只需要几行代码就可以发送邮件
配置信息:
它管理电子邮件消息的要求。 类构造函数采用以下形式
Message(
subject='', # 设置标题
recipients=[], # 收件人
body=None, # 发内容
html=None,
sender=None, # 发件人
cc=None,
bcc=None,
attachments=None, # 附件
reply_to=None,
date=None,
charset=None,
extra_headers=None,
mail_options=None,
rcpt_options=None
)
3.2 类方法
attach()
- 向消息添加附件。 该方法采用以下参数:
filename
- 要附加的文件的名称content_type
- 文件的MIME类型data
- 原始文件数据disposition
- 内容处置,如果有的话
add_recipient()
- 向消息添加另一个收件人
-
初始化邮箱
from flask_mail import Mail, Message mail = Mail(app)
-
配置邮箱信息
class BaseConfig2(): MAIL_SERVER = "smtp.qq.com" # 设置SMTP服务器 MAIL_PORT = 465 # 设置端口 MAIL_USE_TLS = False # 是否使用TLSSL加密 MAIL_USER_SSL = True # 是否使用SSL加密 MAIL_USERNAME = "liu.zhong.kun@oxmail.com" # 邮箱 MAIL_PASSWORD = "xtisaddfdfntdcjf" # 密码 MAIL_DEFAULT_SENDER = "A.L.Kun<liu.zhong.kun@oxmail.com>" # 发件人
-
创建信息
msg = Message(subject="This is title", recipients="3500515050@qq.com") msg.body = "This is body"
-
发送信息
mail.send(msg)
总代码:
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
__author__ = "A.L.Kun"
__file__ = "app_.py"
__time__ = "2022/6/24 19:56"
__email__ = "liu.zhong.kun@foxmail.com"
from settings import BaseConfig2
from flask import Flask
from flask_mail import Mail, Message
app = Flask(__name__)
app.config.from_object(BaseConfig2)
mail = Mail(app)
@app.route('/')
def hello():
msg = Message(subject="This is title", recipients="3500515050@qq.com")
msg.body = "This is body"
message.html = render_template('content.html') # 可以发送一个html页面
mail.send(msg)
return "信息发送完成!"
if __name__ == '__main__':
app.run(debug=True)
五、 flask_script
1、 简介
安装:pip install flask-script
通过使用Flask-Script扩展,我们可以在Flask服务器启动的时候,通过命令行的方式传入参数。而不仅仅通过app.run()方法中传参,比如我们可以通过python hello.py runserver –host ip地址,告诉服务器在哪个网络接口监听来自客户端的连接
2、 启动服务from flask import Flask
app = Flask(__name__)
"""使用flask-script启动项目"""
from flask_script import Manager
manager = Manager(app)
@app.route('/')
def index():
return 'hello world'
if __name__ == "__main__"
manager.run()
3、 传入参数在命令行输入
python manage.py runserver
,即可启动服务
from flaskScript import create_app
from flask_script import Manager
app = create_app() # 返回一个app对象
manager = Manager(app)
@manager.command
def custom(arg):
"""
这个方法可以接收从命令行输入的参数,位置参数
如:
运行:python manage.py custom 123
其会在控制台打印 123
"""
print(arg)
@manager.option("-n", "--name", dest='name')
@manager.option("-u", "--url", dest="url")
def cmd_(name, url):
"""
自定义命令,可选参数
:param name:从命令行传入的姓名
:param url: 从命令行传入的url
:return:
如:
输入:python manage.py cmd_ -n "lihua" -u "127.0.0.1"
则会输出:lihua 127.0.0.1
"""
print(name, url)
if __name__ == '__main__':
manager.run()
作用:
python manage.py runserver ...
python manage.py 自定义命令
sqlalchemy
这里面有sqlalchemy
的一些基本操作
flask中一般使用flask-sqlalchemy来操作数据库,使用起来比较简单,易于操作
安装:
pip install flask-sqlalchemy
配置参数
SQLALCHEMY_DATABASE_URI
连接数据库。示例:mysql://username:password@host/post/db?charset=utf-8
SQLALCHEMY_BINDS
一个将会绑定多种数据库的字典。 更多详细信息请看官文 绑定多种数据库.
SQLALCHEMY_ECHO
调试设置为true
SQLALCHEMY_POOL_SIZE
数据库池的大小,默认值为5。
SQLALCHEMY_POOL_TIMEOUT
连接超时时间
SQLALCHEMY_POOL_RECYCLE
自动回收连接的秒数。
SQLALCHEMY_MAX_OVERFLOW
控制在连接池达到最大值后可以创建的连接数。当这些额外的 连接回收到连接池后将会被断开和抛弃。
SQLALCHEMY_TRACK_MODIFICATIONS
如果设置成 True (默认情况),Flask-SQLAlchemy 将会追踪对象的修改并且发送信号。这需要额外的内存, 如果不必要的可以禁用它。
2、 使用步骤
-
配置数据库信息,在
settings.py
文件中添加SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:qew123@127.0.0.1:3306/flask1?charset=utf8" SQLALCHEMY_TRACK_MODIFICATIONS = False SQLALCHEMY_ECHO = True
-
创建一个
app.py
文件,在里面添加from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy() # 实例化对象 app = Flask(__name__) app.config.from_file("settings.py") # 导入配置文件 db.init_app(app) # 初始化sqlalchemy,使用app里面的配置文件
-
编写数据库结构,创建一个
models.py
,与app.py
同级from app import db class Users(db.Model): __table__ = "user" id = db.Column(db.INTEGER, primary_key=True, autoincrement=True) name = db.Column(db.String(32))
-
在路由中使用
import models @app.route("/login", methods=["GET", "POST"]) def login(): data = db.session.query(models.Users).all() # 查找数据库里面的所有信息 print(data) db.session.remove() # 移除session return "Login"
扩展,离线脚本的使用
from app import app
with app.app_context():
pass # 里面可以运行flask运行时进行的操作
七、 flask_migrate
作用:做数据库迁移
其依赖于:flask-script/flask-sqlalchemy
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_script import Manager
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = "sqlite:///app.db" # 连接数据库
db = SQLAlchemy(app)
manager = Manager(app)
Migrate(app, db) # 实例化组件
"""
flask db init # 初始化表
flask db migrate # 将表在数据库中创建出来
flask db upgrade # 更新表的结构
"""
# Model
class User(db.Model):
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(128))
if __name__ == '__main__':
manager.run()
八、 自定义组件
在auto.py
中添加:
from flask import request, session, redirect
class Auth:
def __init__(self, app=None):
self.app = app
if app is not None:
self.init_app(app)
def init_app(self, app):
app.auto = self # 将这个类的信息全部写入app里面
self.app = app
self.app.before_request(self.check_login)
def check_login(self):
print("检测用户是否登录")
usr = session.get("usr")
print(usr)
def login(self, data):
"""创建session"""
session["usr"] = data
def login_out(self):
"""用户登出"""
del session["usr"]
使用这个组件时:
from auto import Auto
from flask import Flask
app = Flask(__name__)
at = Auto()
at.init_app(app)
################################################
from flask import current_app
cerrent_app.login_out() # 调用组件登出的功能
最后,总目录结构为:
点击我,查看源代码
九、 其它 1、 多app应用from flask import Flask
from werkzeug.middleware.dispatcher import DispatcherMiddleware
from werkzeug.serving import run_simple
app01 = Flask("app01")
app02 = Flask("app02")
# //login -> 访问 app01下面的login
dm = DispatcherMiddleware(app01, {
"/app02": app02 # /appo2/index 访问app02下面的index
})
@app01.route("/login")
def login():
return "app01.login"
@app02.route("/index")
def index():
return "app02.index"
if __name__ == '__main__':
# 请求一旦进来会执行 run_simple 方法的第三个参数加括号:dm(),对象加括号会调用对象的__call__方法
run_simple("127.0.0.1", 5000, dm)
2、 信号
from flask import Flask, signals
app = Flask(__name__)
def func(*args, **kwargs):
print("请求开始,触发的app为", *args, **kwargs)
signals.request_started.connect(func) # 连接信号
"""
含有的全部信号
template_rendered
before_render_template
request_started
request_finished
request_tearing_down
got_request_exception
appcontext_tearing_down
appcontext_pushed
appcontext_popped
message_flashed
这里可以在源码中查看这些信号的触发条件,源码内部使用send方法触发信号,或者百度也OK
"""
@app.route("/")
def index():
return "首页面"
if __name__ == '__main__':
app.run(debug=True)
注意,信号和装饰器的区别是,信号无法控制程序的进行,其只是提供一个提示功能,在原来的基础增加额外的操作和值;而装饰器可以控制请求是否可以继续往后执行