一.threading.local 多个线程修改同一个数据,复制多份变量给每个线程用,为每个线程开辟一块空间进行数据存储 不用threading.local (多线程资源共享,sleep后执行取到的都是最后一次修改的值
一.threading.local
多个线程修改同一个数据,复制多份变量给每个线程用,为每个线程开辟一块空间进行数据存储
不用threading.local (多线程资源共享,sleep后执行取到的都是最后一次修改的值)
# 不用local from threading import Thread import time cxw = -1 def task(arg): global cxw cxw = arg # time.sleep(2) print(cxw) ? for i in range(10): t = Thread(target=task,args=(i,)) t.start()
###threading.local使用 (给每一个线程都指定了一个id存储各自修改的属性值,彼此数据隔离)
from threading import Thread from threading import local import time from threading import get_ident # 特殊的对象 cxw = local() def task(arg): # 对象.val = 1/2/3/4/5 cxw.value = arg time.sleep(2) print(cxw.value) for i in range(10): t = Thread(target=task,args=(i,)) t.start()
通过字典自定义threading.local(函数)
from threading import get_ident,Thread import time storage = {} def set(k,v): ident = get_ident() if ident in storage: storage[ident][k] = v else: storage[ident] = {k:v} def get(k): ident = get_ident() return storage[ident][k] def task(arg): set(‘val‘,arg) v = get(‘val‘) print(v) ? for i in range(10): t = Thread(target=task,args=(i,)) t.start()
面向对象版
from threading import get_ident,Thread import time class Local(object): storage = {} def set(self, k, v): ident = get_ident() if ident in Local.storage: Local.storage[ident][k] = v else: Local.storage[ident] = {k: v} def get(self, k): ident = get_ident() return Local.storage[ident][k] obj = Local() def task(arg): obj.set(‘val‘,arg) v = obj.get(‘val‘) print(v) for i in range(10): t = Thread(target=task,args=(i,)) t.start()
通过setattr和getattr实现
from threading import get_ident,Thread import time class Local(object): storage = {} def __setattr__(self, k, v): ident = get_ident() if ident in Local.storage: Local.storage[ident][k] = v else: Local.storage[ident] = {k: v} def __getattr__(self, k): ident = get_ident() return Local.storage[ident][k] obj = Local() def task(arg): obj.val = arg print(obj.val) for i in range(10): t = Thread(target=task,args=(i,)) t.start()
每个对象有自己的存储空间(字典)
from threading import get_ident,Thread import time class Local(object): def __init__(self): object.__setattr__(self,‘storage‘,{}) self.storage={} def __setattr__(self, k, v): ident = get_ident() if ident in self.storage: self.storage[ident][k] = v else: self.storage[ident] = {k: v} def __getattr__(self, k): ident = get_ident() return self.storage[ident][k] obj = Local() def task(arg): obj.val = arg obj.xxx = arg print(obj.val) for i in range(10): t = Thread(target=task,args=(i,)) t.start()
兼容线程和协程(源码到request中去看,看local的getattr,setattr)
try: from greenlet import getcurrent as get_ident except Exception as e: from threading import get_ident from threading import Thread import time class Local(object): def __init__(self): object.__setattr__(self,‘storage‘,{}) def __setattr__(self, k, v): ident = get_ident() if ident in self.storage: self.storage[ident][k] = v else: self.storage[ident] = {k: v} def __getattr__(self, k): ident = get_ident() return self.storage[ident][k] obj = Local() def task(arg): obj.val = arg obj.xxx = arg print(obj.val) for i in range(10): t = Thread(target=task,args=(i,)) t.start()
partial偏函数
#偏函数的第二个部分(可变参数),按原有函数的参数顺序进行补充,参数将作用在原函数上,最后偏函数返回一个新函数 from functools import partial def test(a,b,c,d): return a+b+c+d ? tes=partial(test,1,2) print(tes(3,4))
二.请求上下文
‘‘‘ 全局的变量 _request_ctx_stack = LocalStack() _app_ctx_stack = LocalStack() current_app = LocalProxy(_find_app) #local就是我们的partial(_lookup_req_object, "request") request = LocalProxy(partial(_lookup_req_object, "request")) session = LocalProxy(partial(_lookup_req_object, "session")) g = LocalProxy(partial(_lookup_app_object, "g")) self.wsgi_app(environ, start_response)源码: def wsgi_app(self, environ, start_response): #ctx是ResquestContext的对象,里面request ctx = self.request_context(environ) error = None try: try: #就是ctx放到了Local对象 ctx.push() #所有请求的执行函数的,包括请求扩展,真正的视图函数 response = self.full_dispatch_request() except Exception as e: error = e response = self.handle_exception(e) except: # noqa: B001 error = sys.exc_info()[1] raise # 请求之后的函数 return response(environ, start_response) finally: if self.should_ignore_error(error): error = None ctx.auto_pop(error) 1 ctx = self.request_context(environ) environ,请求相关的,ctx现在是包含request,session的RequestContext的对象 源码: 1.1RequestContext(self, environ) self ,是app对象 environ,请求相关的 1.2 RequestContext在实例化的时候的源码: def __init__(self, app, environ, request=None, session=None): self.app = app if request is None: request = app.request_class(environ) self.request = request self.url_adapter = None try: self.url_adapter = app.create_url_adapter(self.request) except HTTPException as e: self.request.routing_exception = e self.flashes = None self.session = session self._implicit_app_ctx_stack = [] self.preserved = False self._after_request_functions = [] 这个RequestContext对象封装了,request 和seesoin 2 ctx.push()这个ctx是RequestContext,那就执行RequestContext.push方法 2.1RequestContext.push()的源码 def push(self): #_request_ctx_stack是localStack的对象 #self是ctx,把self也就ctx放入到local对象里面 _request_ctx_stack.push(self) if self.session is None: session_interface = self.app.session_interface self.session = session_interface.open_session(self.app, self.request) if self.session is None: self.session = session_interface.make_null_session(self.app) if self.url_adapter is not None: self.match_request() 2.1.1 _request_ctx_stack.push(self)现在的self是ctx 2.1.2 _request_ctx_stack是LocalStack()的对象 2.1.3 LocalStack()的push把ctx传过来 2.1.4 LocalStack()的push方法 源码: #obj是ctx def push(self, obj): #obj是ctx,requestContext的对象 rv = getattr(self._local, "stack", None) if rv is None: # self._local是Local()的对象 # storage[“线程id或者协程id”][stack] = [ctx,] self._local.stack = rv = [] rv.append(obj) return rv 2的最终也就是ctx.push()他的最终目的:把当前的ctx放入到Local()里面 3 response = self.full_dispatch_request() 源码: def full_dispatch_request(self): #这是服务器第一次请求时候执行的函数 self.try_trigger_before_first_request_functions() try: request_started.send(self) #执行请求之前所有的函数,并且拿到请求之前的返回值 rv = self.preprocess_request() if rv is None: #这个是真正视图函数,如果我的请求之前函数没有返回值才会执行 rv = self.dispatch_request() except Exception as e: rv = self.handle_user_exception(e) return self.finalize_request(rv) 3.1 return self.finalize_request(rv)的源码: def finalize_request(self, rv, from_error_handler=False): response = self.make_response(rv) try: #请求之后的函数,after_request response = self.process_response(response) request_finished.send(self, response=response) except Exception: if not from_error_handler: raise self.logger.exception( "Request finalizing failed with an error while handling an error" ) return response 4 我们的现在已经在2步的时候把我们request已经方法Locald对象中了,我们第三步的任意一个地方 都能使用我们的request,session,那他是怎么获取的? 4.1 我们在flask导入request,这个request是一个全局的变量,我们怎么通过request区分我当前的request对象(environ) 我们发现request是LocalProxy的对象 4.2 当我们用全局的request.属性的时候,就会去找LocalProxy的对象,但是我们发现里面根本就没有 那他一定执行LocalProxy对象的__getattr__方法 4.3 我们现在来看LocalProxy对象的__getattr__方法的源码: #name我们要获取属性名 def __getattr__(self, name): if name == "__members__": return dir(self._get_current_object()) #form #self._get_current_object()就是ctx里面的request, return getattr(self._get_current_object(), name) 4.3.1 通过反射self._get_current_object()对象,来找我们属性,也就是name self._get_current_object()的源码: def _get_current_object(self): if not hasattr(self.__local, "__release_local__"): return self.__local() try: #self.__local就实例化传过来的偏函数, return getattr(self.__local, self.__name__) except AttributeError: raise RuntimeError("no object bound to %s" % self.__name__) 4.3.1.1 return getattr(self.__local, self.__name__)那这里self.__local是谁? def __init__(self, local, name=None): object.__setattr__(self, "_LocalProxy__local", local) self.___local为local 这个local为实例化的时候传的 4.3.1.1.1 这个实例化的时候的操作 request = LocalProxy(partial(_lookup_req_object, "request")) 4.3.1.1的local就是 partial(_lookup_req_object, "request")的地址 4.3.1.1.2 _lookup_req_object的源码: #调用的时候 partial(_lookup_req_object, "request") #现在的name就是"request" def _lookup_req_object(name): # top是当前线程的ctx top = _request_ctx_stack.top if top is None: raise RuntimeError(_request_ctx_err_msg) #找top里面的request # ctx找request return getattr(top, name) 4.3.1.1.2 我们来看这个_request_ctx_stack.top的top方法 def top(self): try: return self._local.stack[-1] except (AttributeError, IndexError): return None 我们发现这个self._local是Local()对象,这样就把ctx拿到了 ‘‘‘
三.蓝图
对程序进行目录结构划分
不使用蓝图,自己分文件
-templates
-views
-__init__.py
-user.py
-order.py
-app.py
app.py
from views import app if __name__ == ‘__main__‘: app.run()
init.py
from flask import Flask,request app = Flask(__name__) #不导入这个不行 from . import account from . import order from . import user
user.py
from . import app @app.route(‘/user‘) def user(): return ‘user‘
order.py
from . import app @app.route(‘/order‘) def order(): return ‘order‘
使用蓝图之中小型系统
详见代码:pro_flask_简单应用程序目录示例.zip
目录结构:
-flask_pro
-flask_test
-__init__.py
-static
-templates
-views
-order.py
-user.py
-manage.py
_init.py
from flask import Flask app=Flask(__name__) from flask_test.views import user from flask_test.views import order app.register_blueprint(user.us) app.register_blueprint(order.ord)
manage.py
from flask_test import app if __name__ == ‘__main__‘: app.run(port=8008)
user.py
from flask import Blueprint us=Blueprint(‘user‘,__name__) ? @us.route(‘/login‘) def login(): return ‘login‘
order.py
from flask import Blueprint ord=Blueprint(‘order‘,__name__) ? @ord.route(‘/test‘) def test(): return ‘order test‘
使用蓝图之大型系统
详见代码:pro_flask_大型应用目录示例.zip 总结: 1 xxx = Blueprint(‘account‘, __name__,url_prefix=‘/xxx‘) :蓝图URL前缀,表示url的前缀,在该蓝图下所有url都加前缀 2 xxx = Blueprint(‘account‘, name,url_prefix=‘/xxx‘,template_folder=‘tpls‘):给当前蓝图单独使用templates,向上查找,当前找不到,会找总templates 3 蓝图的befort_request,对当前蓝图有效 4 大型项目,可以模拟出类似于django中app的概念
四.g对象
from flask import Flask,views,session,request,g app=Flask(__name__) ‘‘‘‘ session 只要设置,在任意请求中都能拿到,无论你拿多少次 flash 一旦设置,可在任意一次请求中获取,但是只能取一次 g 一旦设置,只能在当前请求中获取,其它的请求都不能获取 ‘‘‘ @app.after_request def test(response): print(g.name) return response @app.route("/") def index(): g.name = "suv" return "ok" @app.route("/index") def index1(): return "ojbk" if __name__ == ‘__main__‘: app.run()
五.flask-session
作用:将默认保存的签名cookie中的值 保存到 redis/memcached/file/Mongodb/SQLAlchemy
安装:pip3 install flask-session
from flask import Flask,session from flask_session import RedisSessionInterface import redis app = Flask(__name__) conn=redis.Redis(host=‘127.0.0.1‘,port=6379) #use_signer是否对key签名 #如果use_siginer为False,这表示不需要配置app.secret_key
#permanent=False表示关闭浏览器cookie失效
app.secret_key="aksdhkajs" app.session_interface=RedisSessionInterface(conn,key_prefix=‘lqz‘, use_signer=True,permanent=False) ‘‘‘ 之前的session seesion名字为配置文件中的名字 存 seesion ->加密-->cookie 取 session -->值 ----》解密 redis的seesion: seesion名字为配置文件中的名字(默认为session)
前台cookie => session:session.id
redis => name:value name=self.key_prefix + session.sid, value=val 是redis的值 ‘‘‘ @app.route(‘/‘) def hello_world(): session[‘name‘]=‘lqz‘ return ‘Hello World!‘ @app.route("/index") def index(): print(session[‘name‘]) return "ok" if __name__ == ‘__main__‘: app.run()
第二种简便方式:
from flask import Flask,session from redis import Redis from flask_session import Session app = Flask(__name__) app.config[‘SESSION_TYPE‘] = ‘redis‘ app.config[‘SESSION_REDIS‘] = Redis(host=‘127.0.0.1‘,port=‘6379‘) Session(app) @app.route(‘/‘) def hello_world(): session[‘name‘]=‘lqz‘ return ‘Hello World!‘ @app.route("/index") def index(): print(session[‘name‘]) return "ok" if __name__ == ‘__main__‘: app.run()
六.信号
Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一些用户行为
安装:pip3 install blinker
内置信号: ```python request_started = _signals.signal(‘request-started‘) # 请求到来前执行 request_finished = _signals.signal(‘request-finished‘) # 请求结束后执行 before_render_template = _signals.signal(‘before-render-template‘) # 模板渲染前执行 template_rendered = _signals.signal(‘template-rendered‘) # 模板渲染后执行 got_request_exception = _signals.signal(‘got-request-exception‘) # 请求执行出现异常时执行 request_tearing_down = _signals.signal(‘request-tearing-down‘) # 请求执行完毕后自动执行(无论成功与否) appcontext_tearing_down = _signals.signal(‘appcontext-tearing-down‘)# 应用上下文执行完毕后自动执行(无论成功与否) appcontext_pushed = _signals.signal(‘appcontext-pushed‘) # 应用上下文push时执行 appcontext_popped = _signals.signal(‘appcontext-popped‘) # 应用上下文pop时执行 message_flashed = _signals.signal(‘message-flashed‘) # 调用flask在其中添加数据时,自动触发 ```
使用信号:
from flask import Flask,signals,render_template app = Flask(__name__) # 往信号中注册函数 def func12123(*args,**kwargs): print(‘触发型号‘,args,kwargs) signals.request_started.connect(func12123) # 触发信号: signals.request_started.send() @app.before_first_request def before_first1(*args,**kwargs): pass @app.before_first_request def before_first2(*args,**kwargs): pass @app.before_request def before_first3(*args,**kwargs): pass @app.route(‘/‘,methods=[‘GET‘,"POST"]) def index(): print(‘视图‘) return "ok" if __name__ == ‘__main__‘: app.wsgi_app app.run()
自定义信号:
from flask import Flask, current_app, flash, render_template from flask.signals import _signals app = Flask(import_name=__name__) # 自定义信号 xxxxx = _signals.signal(‘xxxxx‘) # 必须有一个位置参数,去接收他的发送者, def func(sender,name): print(sender) print(name) print("123") # 自定义信号中注册函数 xxxxx.connect(func) @app.route("/x") def index(): # 触发信号,这里的第一是发送者,第二个参数可选的话,必须是键值对 xxxxx.send("sb",name="wb") return ‘Index‘ if __name__ == ‘__main__‘: app.run()