- Flask常用组件
- 一、 flask_session
- 1、 常用配置
 - 2、 使用方法
- 2.1 session_interface
 - 2.2 config
 
 
 - 二、 DBUtils
- 1、 引言
 - 2、 DBUtils
- 2.1 模式一
 - 2.2 模式二
 
 - 3、 代码封装
 - 4、 结合flask使用
- 4.1 方式一
 - 4.2 方式二
 - 4.3 方法三
 
 
 - 三、 wtforms
- 1、 支持的字段
 - 2、 字段参数
 - 3、 验证函数
 - 4、 代码实例
 
 - 四、 flask_mail
- 1、 简介
 - 2、 Mail
 - 3、 Massage
- 3.1 实例化对象
 - 3.2 类方法
 
 - 4、 使用方法
 
 - 五、 flask_script
- 1、 简介
 - 2、 启动服务
 - 3、 传入参数
 
 - 六、 flask_sqlalchemy
- 1、 简介
 - 2、 使用步骤
 
 - 七、 flask_migrate
 - 八、 自定义组件
 - 九、 其它
- 1、 多app应用
 - 2、 信号
 
 
 - 一、 flask_session
 
 session 是基于cookie实现, 保存在服务端的键值对(形式为 {随机字符串:'uuid'}), 同时在浏览器中的cookie中也对应一相同的随机字符串,用来再次请求的 时候验证,这个组件的作用是将session数据存储到数据库中
# 对 session 进行保护
SECRET_KEY = os.urandom(32)  # 创建密钥
SESSION_USE_SIGNER = True  # 是否签名保护
PERMANENT_SESSION_LIFETIME = timedelta(minutes=30)  # 设置时间延迟
SESSION_REFRESH_EACH_REQUEST = True  # 每刷新一次就更新一次session
SESSION_TYPE = "redis"  # 设置redis存储session数据
SESSION_REDIS = Redis("127.0.0.1")  # 连接数据库
SESSION_PREFIX = "MyWebPrefix:"  # 设置session前缀,默认为session:
2、 使用方法 2.1 session_interface注意:
session中存储的是字典,当修改字典内部(第二级)元素时,会造成数据不更新解决方法:
我们要先阅读一下部分源码
class SecureCookieSession(CallbackDict, SessionMixin): #: When data is changed, this is set to ``True``. Only the session modified = False # 只有当session里面的数据改变的时候,这个才会被设置成True #: When data is read or written, this is set to ``True``. Used by accessed = False def __init__(self, initial: t.Any = None) -> None: def on_update(self) -> None: self.modified = True # 如果数据更新,就会被改为True,后面就会对session设置到cookies中 """ if not (session.modified # 如果session修改,就会重新设置到cookies中 or ( session.permanent # 是否设置到cookie里面 and app.config["SESSION_REFRESH_EACH_REQUEST"] # 如果为True,则每次请求刷新一次 )): return ... response.set_cookie(...) """ self.accessed = True # 如果session已读,则将session写入response中 """ if session.accessed: response.vary.add("Cookie") """ # 将on_update()传递给CallbackDict super().__init__(initial, on_update)解决方法:
方法一
session.modified = True # 手动使得session标记为session的内容被修改方法二
app.config["SESSION_REFRESH_EACH_REQUEST"] = True # 每刷新一次就更新 # 同时,在登录成功之后,设置 session.permanent = True # 默认为 False,但是如果使用redis的话,就不需要设置,其默认为True
通过session_interface来设置
from flask_session import RedisSessionInterface
from flask import Flask
from os import urandom
from redis import Redis
app = Flask(__name__)
app.serect_key = urandom(32)  # 设置32位随机密钥
app.config["SESSION_USE_SIGNER"] = True  
# 通过redis保存session
app.session_interface = RedisSessionInterface(
    redis=Redis("127.0.0.1"),  # 连接Redis数据库
    key_prefix="flask_login",  # 设置随机字符串的前缀,默认为session:
)
2.2 config默认session设置为
from flask.sessions import SecureCookieSessionInterface app.session_interface = SecureCookieSessionInterface()修改后,就可以直接使用了,其使用方法和原先的一样
通过配置文件来设置
from flask_session import Session
from flask import Flask
from os import urandom
from redis import Redis
settings = {
    "SESSION_TYPE": "redis",  # 使用redis数据库连接
    "SESSION_REDIS": Redis("127.0.0.1"),  # 连接redis数据库
    "SECRET_KEY": os.urandom(32),  # 创建密钥
    "SESSION_USE_SIGNER": True,  # 是否签名保护
    "SESSION_REFRESH_EACH_REQUEST": True, # 每刷新一次就更新数据
}
app.config.from_mapping(settings)  # 设置配置
Session(app)  # 内部封装了设置app.session_interface的方法
二、 DBUtils
1、 引言
当我们要对数据库进行操作时,可以这么干:
import pymysql
from functools import wraps
SQL_CONFIG = {
    # 对数据库的配置
    "host": "127.0.0.1",
    "port": 3306,
    "user": "root",
    "passwd": "qwe123",
    "db": "flask1",
}
def sql_outer(fun):
    """使用装饰器,可以对数据库便捷操作"""
    @wraps(fun)
    def inner(sql, *args):
        conn = pymysql.connect(**SQL_CONFIG)  # 连接数据库
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)  # 返回字典类型的SQL数据
        obj = fun(sql, cursor, *args)  # 将参数传入函数中,进行运行
        conn.commit()  # 提交事务
        cursor.close()  # 关闭游标
        conn.close()  # 关闭连接
        return obj  # 返回数据
    return inner
class SQLTool:
    @staticmethod
    @sql_outer
    def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        # print(*args)
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入
        obj = cursor.fetchall()  # 获取全部数据
        return obj
    @staticmethod
    @sql_outer
    def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
        obj = cursor.fetchone()  # 获取第一条数据
        return obj
if __name__ == '__main__':
    obj = SQLTool.fetch_one("SELECT id, name FROM users WHERE name=%s and pwd=%s", ["kun", "123"])
    print(obj)
2、 DBUtils问题来了,如果有很多连接的话,开启数据库再关闭是不是很麻烦呢?
- 我们可以使用数据库连接池:
 DBUtils
使用数据库连接池
此连接池有两种模式:
- 为每一个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把连接重新发到连接池,供自己线程再次使用。当线程终止,连接自动关闭
 - 创建一批连接到连接池,供所有线程共享使用(主要)
 
POOL = PersistentDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    closeable=False,
    # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
    threadlocal=None,  # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8'
)
def func():
    conn = POOL.connection(shareable=False)
    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    cursor.close()
    conn.close()
func()
2.2 模式二
import time
import pymysql
import threading
from dbutils.pooled_db import PooledDB, SharedDBConnection
POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
    maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8'
)
def func():
    # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
    # 否则
    # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
    # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
    # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
    # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
    conn = POOL.connection()
    # print(th, '链接被拿走了', conn1._con)
    # print(th, '池子里目前有', pool._idle_cache, '\r\n')
    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    conn.close()
func()
3、 代码封装
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author: A.L.Kun
# @file : sql.py
# @time : 2022/6/5 0:29
import pymysql
from functools import wraps
# 使用单例模式创建一个数据库连接池
from dbutils.pooled_db import PooledDB
import pymysql
SQL_CONFIG = {
    # 对数据库的配置
    "host": "127.0.0.1",
    "port": 3306,
    "user": "root",
    "passwd": "qwe123",
    "db": "flask1",
    "charset": "utf8",
}
POOL_CONFIG = {
    #  对数据库连接池的配置
    "maxconnections": 6,
    "mincached": 2,
    "maxcached": 5,
    "maxshared": 3,
    "blocking": True,
    "maxusage": None,
    "setsession": [],
    "ping": 0,
}
POOL = PooledDB(
    creator=pymysql,
    **SQL_CONFIG,
    **POOL_CONFIG
)
def select_sql(type_):
    def sql_outer(fun):
        """使用装饰器,可以对数据库便捷操作"""
        @wraps(fun)
        def inner(sql, *args):
            if type_ == "sql":
                """如果通过数据库来获取值的话"""
                conn = pymysql.connect(**SQL_CONFIG)  # 连接数据库
                cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)  # 返回字典类型的SQL数据
                obj = fun(sql, cursor, *args)  # 将参数传入函数中,进行运行
                conn.commit()  # 提交事务
                cursor.close()  # 关闭游标
                conn.close()  # 关闭连接
                return obj  # 返回数据
            elif type_ == "pool":
                # 通过数据库连接池取值
                conn = POOL.connection()
                cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
                obj = fun(sql, cursor, *args)
                conn.commit()
                cursor.close()  # 关闭游标
                conn.close()  # 关闭连接
                return obj
            raise ValueError("type_ value error, value = pool or sql")
        return inner
    return sql_outer
class SQLTool:
    """使用pymysql单线程连接"""
    @staticmethod
    @select_sql("sql")
    def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        # print(*args)
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入
        obj = cursor.fetchall()  # 获取全部数据
        return obj
    @staticmethod
    @select_sql("sql")
    def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
        obj = cursor.fetchone()  # 获取第一条数据
        return obj
class DBUtilsTool:
    """使用数据库连接池"""
    @staticmethod
    @select_sql("pool")
    def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        # print(*args)
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入
        obj = cursor.fetchall()  # 获取全部数据
        return obj
    @staticmethod
    @select_sql("pool")
    def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
        obj = cursor.fetchone()  # 获取第一条数据
        return obj
obj = DBUtilsTool.fetch_one("SELECT id, name FROM users WHERE name=%s and pwd=%s", ["kun", "123"])
print(obj)
4、 结合flask使用
4.1 方式一
无法创建的原因是flask的上下文管理机制,当flask还没run()的时候,无法访问current_app里面的配置信息
将配置信息放到settings配置文件中,再使用init_app方法——模拟Session(app),把数据库连接池放到配置文件config里面
在存放数据库连接池的文件sql.py中:
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author: A.L.Kun
# @file : sql.py
# @time : 2022/6/5 0:29
import pymysql
from functools import wraps
from flask import current_app
from dbutils.pooled_db import PooledDB
def sql_outer(fun):
    """使用装饰器,可以对数据库便捷操作"""
    @wraps(fun)
    def inner(sql, *args):
        # 通过数据库连接池取值
        POOL = current_app.config["SQL_POOL"]  # 将其存储到配置文件中,防止连接池不存在
        conn = POOL.connection()
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        obj = fun(sql, cursor, *args)
        conn.commit()
        cursor.close()  # 关闭游标
        conn.close()  # 关闭连接
        return obj
    return inner
class DBUtilsTool:
    """使用数据库连接池,模拟Session(app)"""
    def __init__(self, app=None):
        self.app = app
        if app:
            self.init_app(app)
    def init_app(self, app):
        POOL = PooledDB(
            **app.config["POOL_CONFIG"]
        )
        app.config["SQL_POOL"] = POOL
    @staticmethod
    @sql_outer
    def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        # print(*args)
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入
        obj = cursor.fetchall()  # 获取全部数据
        return obj
    @staticmethod
    @sql_outer
    def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
        obj = cursor.fetchone()  # 获取第一条数据
        return obj
在settings.py文件中:
import pymysql
class BaseConfig:
    POOL_CONFIG = {
        #  对数据库连接池的配置
        "creator": pymysql,
        "maxconnections": 6,
        "mincached": 2,
        "maxcached": 5,
        "maxshared": 3,
        "blocking": True,
        "maxusage": None,
        "setsession": [],
        "ping": 4,
        # 对数据库的配置
        "host": "127.0.0.1",
        "port": 3306,
        "user": "root",
        "passwd": "qwe123",
        "db": "flask1",
        "charset": "utf8",
    }
在manage.py主运行程序中
from sql import DBUtilsTool
from flask import Flask
def create_app():
    app = Flask(__name__)
    app.config.from_object("settings.BaseConfig")
    # 模拟session_redis 将app传入,初始化数据库连接池
    DBUtilsTool(app)  # 在程序刚启动的时候,flask还没开始运行,就把配置文件放到其中,flask开启的时候,就已经将数据库连接池存储到了config中
    return app
if __name__ == '__main__':
    app = create_app()
    app.run("0.0.0.0")
4.2 方式二
直接将数据库连接池添加到配置文件中
在sql.py中
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author: A.L.Kun
# @file : sql.py
# @time : 2022/6/5 0:29
import pymysql
from functools import wraps
from flask import current_app
def sql_outer(fun):
    """使用装饰器,可以对数据库便捷操作"""
    @wraps(fun)
    def inner(sql, *args):
        # 通过数据库连接池取值
        POOL = current_app.config["SQL_POOL"]  # 将其存储到配置文件中,防止连接池不存在
        conn = POOL.connection()
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        obj = fun(sql, cursor, *args)
        conn.commit()
        cursor.close()  # 关闭游标
        conn.close()  # 关闭连接
        return obj
    return inner
class DBUtilsTool:
    """使用数据库连接池"""
    @staticmethod
    @sql_outer
    def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        # print(*args)
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入
        obj = cursor.fetchall()  # 获取全部数据
        return obj
    @staticmethod
    @sql_outer
    def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
        obj = cursor.fetchone()  # 获取第一条数据
        return obj
在settings.py中
from dbutils.pooled_db import PooledDB
class BaseConfig:
    SQL_POOL = PooledDB(
                #  对数据库连接池的配置
                "creator": pymysql,
                "maxconnections": 6,
                "mincached": 2,
                "maxcached": 5,
                "maxshared": 3,
                "blocking": True,
                "maxusage": None,
                "setsession": [],
                "ping": 4,
                # 对数据库的配置
                "host": "127.0.0.1",
                "port": 3306,
                "user": "root",
                "passwd": "qwe123",
                "db": "flask1",
                "charset": "utf8",
            )
在manage.py中
from sql import DBUtilsTool
from flask import Flask
def create_app():
    app = Flask(__name__)
    app.config.from_object("settings.BaseConfig")
    return app
if __name__ == '__main__':
    app = create_app()
    app.run("0.0.0.0")
4.3 方法三不建议使用方法二,其没有做到配置与程序分离的效果
使用pool.py在里面创建POOL
from flask import current_app
POOL = PooledDB(
                **current_app.config["POOL_CONFIG"]
            )
在sql.py中
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author: A.L.Kun
# @file : sql.py
# @time : 2022/6/5 0:29
import pymysql
from functools import wraps
# from .pool import POOL  # 注意不能在这里导入,因为app创建时,注册蓝图的时候,app并没有运行,故current_app里面没有配置信息
def sql_outer(fun):
    """使用装饰器,可以对数据库便捷操作"""
    @wraps(fun)
    def inner(sql, *args):
        # 通过数据库连接池取值
        from .pool import POOL  # 但路由中调用的时候导入,app肯定运行起来了,故current_app里面肯定有配置信息
        conn = POOL.connection()
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        obj = fun(sql, cursor, *args)
        conn.commit()
        cursor.close()  # 关闭游标
        conn.close()  # 关闭连接
        return obj
    return inner
class DBUtilsTool:
    """使用数据库连接池"""
    @staticmethod
    @sql_outer
    def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        # print(*args)
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入
        obj = cursor.fetchall()  # 获取全部数据
        return obj
    @staticmethod
    @sql_outer
    def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
        obj = cursor.fetchone()  # 获取第一条数据
        return obj
要注意
current_app的创建时期,其他的使用方法都一样
只要写原生SQL,就要使用数据库连接池
三、 wtforms作用:专门用于对python web框架做表单验证
可以根据验证函数的源码,来自定义验证函数
4、 代码实例创建一个上传文件功能的app,里面包含了对其工作流程的关键步骤分析,希望对理解有用
代码实例:
在app.py中
from flask import Flask
from wtforms import FileField, Form, SubmitField, widgets, validators
from flask import (request, render_template, current_app)
from werkzeug.utils import secure_filename
import os
class UploadForm(Form):
    """创建上传文件的字段"""
    # UploadForm.checkbox = UnboundField(BooleanField, *args, **kwargs, creation_counter=1)
    file1 = FileField(
        render_kw={"class": "file"},
        widget=widgets.FileInput(),
        validators=[
            validators.DataRequired(message="请选择要上传的文件哦!")
        ]
    )
    #  UploadForm.sbtn = UnboundField(SubmitField, *args, **kwargs, creation_counter=2)
    sbtn = SubmitField(
        render_kw={
            "id": "upload_submit",
            "value": "上传文件",
        },
        widget=widgets.SubmitInput(),
    )
config = {
    "UPLOAD_FOLDER": "upload/",  # 设置存储文件的文件夹
}
app = Flask(__name__)
app.config.from_mapping(config)
@app.route("/upload", methods=['GET', 'POST'])
def upload():
    # 用于专门处理提交的数据的路由
    if request.method == "POST":
        form = UploadForm(formdata=request.files)  # 上传文件
        if form.validate():  # 对数据进行验证
            f = form.file1.data
            f.save(os.path.join(current_app.config['UPLOAD_FOLDER'], secure_filename(f.filename)))  # 将文件名称安全处理
            return render_template("index.html", form=form, msg=f"上传文件成功!上传的文件为{f.filename}")
        else:
            return render_template("index.html", form=form, msg="请选择文件哦!")
    form = UploadForm()
    """
    # 对该类创建时的源码分析
    metaclass=FormMeta  # 其为元类
    # 故,创建对象时
    1、 FormMeta.__call__
        # 在 __call__ 方法里面进行的步骤
        UploadForm._unbound_fields = None
        UploadForm._wtforms_meta = None
            _unbound_fields = [  # 其根据counter来排序
                ("checkbox": UnboundField(BooleanField, *args, **kwargs, creation_counter=1))
                ("sbtn": UnboundField(SubmitField, *args, **kwargs, creation_counter=1))
                ]
            _wtforms_meta =  type("Meta", tuple([DefaultMeta]), {})  # DefaultMeta = Form.Meta
                          =  class Meta(DefaultMeta): 
                                    pass
                                    
    2、 UploadForm.__new__
        # 在 __new__ 方法里面进行的步骤,然后发现没有 __new__ 方法,除非自定义
        pass
        
    3、 UploadForm.__init__
        # 执行 __init__ 方法
        UploadForm()._fields = {
                "file1": FileField(...),
                "sbtn": SubmitField(...)
            }
            UploadForm().name=FileField(...)
            UploadForm().sbtn=SubmitField(...)
    """
    print(form.file1)
    """
    # 访问类的属性时的源码分析
    # 故,form.name 执行的是字段类中的 __str__ 方法
    Field.__str__ -> return self()  # 执行字段的 __call__ 方法
    Field.__call__ -> return self.meta.render_field(self, kwargs)  
    DefaultMeta.render_field(self, kwargs)  -> return field.widget(field, **render_kw)    # 执行 widget 的 __call__ 方法
    Input.__call__ -> return Markup("<input %s>" % self.html_params(name=field.name, **kwargs))  # 进行渲染
    """
    for i in form:
        print(i)  # 发现i是可以遍历的,
        """
        # 其内部有一个iter方法
        BaseField.__iter__ -> iter(self._fields.values())
        print(i)  # 执行字段内部的 __str__ 方法
        """
    return render_template("index.html", form=form)
if __name__ == '__main__':
    app.run()
还有验证流程,可以自己尝试一下
在templates/index.html中
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>文件上传</title>
</head>
<body>
    <div class="uploadFile">
        <h1>请选择要上传的文件</h1>
        <form method="POST" enctype="multipart/form-data">
            <p>{{ form.file1 }}</p>
            <p style="color: red">{{ form.file1.errors[0] }}</p>
            <p>{{ form.sbtn }}</p>
        </form>
        <p style="color: green">{{ msg }}</p>
    </div>
</body>
</html>
四、 flask_mail
1、 简介
在web程序中,经常会需要发送电子邮件。比如,在用户注册账户时发送确认邮件;定期向用户发送热门内容或是促销信息等等。在Web程序中发送电子邮件并不复杂,借助扩展Flask-Mail或是第三方邮件服务,只需要几行代码就可以发送邮件
配置信息:
它管理电子邮件消息的要求。 类构造函数采用以下形式
Message(
    subject='',  # 设置标题
    recipients=[],  # 收件人
    body=None,  # 发内容
    html=None,
    sender=None,  # 发件人
    cc=None,
    bcc=None,
    attachments=None,  # 附件
    reply_to=None,  
    date=None,
    charset=None,
    extra_headers=None,
    mail_options=None,
    rcpt_options=None
)
3.2 类方法
attach() - 向消息添加附件。 该方法采用以下参数:
filename- 要附加的文件的名称content_type- 文件的MIME类型data- 原始文件数据disposition- 内容处置,如果有的话
add_recipient() - 向消息添加另一个收件人
- 
初始化邮箱
from flask_mail import Mail, Message mail = Mail(app) - 
配置邮箱信息
class BaseConfig2(): MAIL_SERVER = "smtp.qq.com" # 设置SMTP服务器 MAIL_PORT = 465 # 设置端口 MAIL_USE_TLS = False # 是否使用TLSSL加密 MAIL_USER_SSL = True # 是否使用SSL加密 MAIL_USERNAME = "liu.zhong.kun@oxmail.com" # 邮箱 MAIL_PASSWORD = "xtisaddfdfntdcjf" # 密码 MAIL_DEFAULT_SENDER = "A.L.Kun<liu.zhong.kun@oxmail.com>" # 发件人 - 
创建信息
msg = Message(subject="This is title", recipients="3500515050@qq.com") msg.body = "This is body" - 
发送信息
mail.send(msg) 
总代码:
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
__author__ = "A.L.Kun"
__file__ = "app_.py"
__time__ = "2022/6/24 19:56"
__email__ = "liu.zhong.kun@foxmail.com"
from settings import BaseConfig2
from flask import Flask
from flask_mail import Mail, Message
app = Flask(__name__)
app.config.from_object(BaseConfig2)
mail = Mail(app)
@app.route('/')
def hello():
    msg = Message(subject="This is title", recipients="3500515050@qq.com")
    msg.body = "This is body"
    message.html = render_template('content.html')  # 可以发送一个html页面
    mail.send(msg)
    return "信息发送完成!"
if __name__ == '__main__':
    app.run(debug=True)
五、 flask_script
1、 简介
安装:pip install flask-script
通过使用Flask-Script扩展,我们可以在Flask服务器启动的时候,通过命令行的方式传入参数。而不仅仅通过app.run()方法中传参,比如我们可以通过python hello.py runserver –host ip地址,告诉服务器在哪个网络接口监听来自客户端的连接
2、 启动服务from flask import Flask
app = Flask(__name__)
"""使用flask-script启动项目"""
from flask_script import Manager
manager = Manager(app)
@app.route('/')
def index():
    return 'hello world'
if  __name__ == "__main__"
    manager.run()
3、 传入参数在命令行输入
python manage.py runserver,即可启动服务
from flaskScript import create_app
from flask_script import Manager
app = create_app()  # 返回一个app对象
manager = Manager(app)
@manager.command
def custom(arg):
    """
    这个方法可以接收从命令行输入的参数,位置参数
    如:
        运行:python manage.py custom 123
        其会在控制台打印 123
    """
    print(arg)
@manager.option("-n", "--name", dest='name')
@manager.option("-u", "--url", dest="url")
def cmd_(name, url):
    """
    自定义命令,可选参数
    :param name:从命令行传入的姓名
    :param url: 从命令行传入的url
    :return:
    如:
        输入:python manage.py cmd_ -n "lihua" -u "127.0.0.1"
        则会输出:lihua 127.0.0.1
    """
    print(name, url)
if __name__ == '__main__':
    manager.run()
作用:
python manage.py runserver ...python manage.py 自定义命令
sqlalchemy这里面有sqlalchemy的一些基本操作
flask中一般使用flask-sqlalchemy来操作数据库,使用起来比较简单,易于操作
安装:
pip install flask-sqlalchemy
配置参数
SQLALCHEMY_DATABASE_URI
连接数据库。示例:mysql://username:password@host/post/db?charset=utf-8
SQLALCHEMY_BINDS
一个将会绑定多种数据库的字典。 更多详细信息请看官文 绑定多种数据库.
SQLALCHEMY_ECHO
调试设置为true
SQLALCHEMY_POOL_SIZE
数据库池的大小,默认值为5。
SQLALCHEMY_POOL_TIMEOUT
连接超时时间
SQLALCHEMY_POOL_RECYCLE
自动回收连接的秒数。
SQLALCHEMY_MAX_OVERFLOW
控制在连接池达到最大值后可以创建的连接数。当这些额外的 连接回收到连接池后将会被断开和抛弃。
SQLALCHEMY_TRACK_MODIFICATIONS
如果设置成 True (默认情况),Flask-SQLAlchemy 将会追踪对象的修改并且发送信号。这需要额外的内存, 如果不必要的可以禁用它。
2、 使用步骤
- 
配置数据库信息,在
settings.py文件中添加SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:qew123@127.0.0.1:3306/flask1?charset=utf8" SQLALCHEMY_TRACK_MODIFICATIONS = False SQLALCHEMY_ECHO = True - 
创建一个
app.py文件,在里面添加from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy() # 实例化对象 app = Flask(__name__) app.config.from_file("settings.py") # 导入配置文件 db.init_app(app) # 初始化sqlalchemy,使用app里面的配置文件 - 
编写数据库结构,创建一个
models.py,与app.py同级from app import db class Users(db.Model): __table__ = "user" id = db.Column(db.INTEGER, primary_key=True, autoincrement=True) name = db.Column(db.String(32)) - 
在路由中使用
import models @app.route("/login", methods=["GET", "POST"]) def login(): data = db.session.query(models.Users).all() # 查找数据库里面的所有信息 print(data) db.session.remove() # 移除session return "Login" 
扩展,离线脚本的使用
from app import app
with app.app_context():
    pass  # 里面可以运行flask运行时进行的操作
七、 flask_migrate
作用:做数据库迁移
其依赖于:flask-script/flask-sqlalchemy
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_script import Manager
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = "sqlite:///app.db"  # 连接数据库
db = SQLAlchemy(app)  
manager = Manager(app)
Migrate(app, db)  # 实例化组件
"""
flask db init  # 初始化表
flask db migrate  # 将表在数据库中创建出来
flask db upgrade  # 更新表的结构
"""
# Model
class User(db.Model):
    __tablename__ = "users"
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(128))
if __name__ == '__main__':
    manager.run()
八、 自定义组件
在auto.py中添加:
from flask import request, session, redirect
class Auth:
    def __init__(self, app=None):
        self.app = app
        if app is not None:
            self.init_app(app)
    def init_app(self, app):
        app.auto = self  # 将这个类的信息全部写入app里面
        self.app = app
        self.app.before_request(self.check_login)
    def check_login(self):
        print("检测用户是否登录")
        usr = session.get("usr")
        print(usr)
    def login(self, data):
        """创建session"""
        session["usr"] = data
    def login_out(self):
        """用户登出"""
        del session["usr"]
使用这个组件时:
from auto import Auto
from flask import Flask
app = Flask(__name__)
at = Auto()
at.init_app(app)
################################################
from flask import current_app
cerrent_app.login_out()  # 调用组件登出的功能 
最后,总目录结构为:
点击我,查看源代码
九、 其它 1、 多app应用from flask import Flask
from werkzeug.middleware.dispatcher import DispatcherMiddleware
from werkzeug.serving import run_simple
app01 = Flask("app01")
app02 = Flask("app02")
# //login -> 访问 app01下面的login
dm = DispatcherMiddleware(app01, {
    "/app02": app02  # /appo2/index  访问app02下面的index
})
@app01.route("/login")
def login():
    return "app01.login"
@app02.route("/index")
def index():
    return "app02.index"
if __name__ == '__main__':
    # 请求一旦进来会执行 run_simple 方法的第三个参数加括号:dm(),对象加括号会调用对象的__call__方法
    run_simple("127.0.0.1", 5000, dm)
2、 信号
from flask import Flask, signals
app = Flask(__name__)
def func(*args, **kwargs):
    print("请求开始,触发的app为", *args, **kwargs)
signals.request_started.connect(func)  # 连接信号
"""
含有的全部信号
template_rendered 
before_render_template 
request_started 
request_finished
request_tearing_down
got_request_exception 
appcontext_tearing_down
appcontext_pushed 
appcontext_popped 
message_flashed 
这里可以在源码中查看这些信号的触发条件,源码内部使用send方法触发信号,或者百度也OK
"""
@app.route("/")
def index():
    return "首页面"
if __name__ == '__main__':
    app.run(debug=True)
注意,信号和装饰器的区别是,信号无法控制程序的进行,其只是提供一个提示功能,在原来的基础增加额外的操作和值;而装饰器可以控制请求是否可以继续往后执行
