当前位置 : 主页 > 编程语言 > python >

性能相关异步IO

来源:互联网 收集:自由互联 发布时间:2022-06-15
在编写爬虫时,性能的消耗主要在IO请求中,当单进程单线程模式下请求URL时必然会引起等待,从而使得请求整体变慢。 import requests def fetch_async(url): response = requests.get(url) return respons

在编写爬虫时,性能的消耗主要在IO请求中,当单进程单线程模式下请求URL时必然会引起等待,从而使得请求整体变慢。

性能相关异步IO_封装性能相关异步IO_异步io_02

import requests

def fetch_async(url):
response = requests.get(url)
return response


url_list = ['http://www.github.com', 'http://www.bing.com']

for url in url_list:
fetch_async(url)

1.同步执行

同步

性能相关异步IO_封装性能相关异步IO_异步io_02

"""
可以实现并发
但是,请求发送出去后和返回之前,中间时期线程空闲
编写方式:
- 直接返回处理
- 通过回调函数处理
"""

########### 编写方式一 ###########
"""
from concurrent.futures import ThreadPoolExecutor
import requests
import time

def task(url):
response = requests.get(url)
print(url,response)
# 写正则表达式


pool = ThreadPoolExecutor(7)
url_list = [
'http://www.***.com/wupeiqi',
'http://huaban.com/favorite/beauty/',
'http://www.bing.com',
'http://www.zhihu.com',
'http://www.sina.com',
'http://www.baidu.com',
'http://www.autohome.com.cn',
]
for url in url_list:
pool.submit(task,url)

pool.shutdown(wait=True)
"""

########### 编写方式二 ###########
from concurrent.futures import ThreadPoolExecutor
import requests
import time

def task(url):
"""
下载页面
:param url:
:return:
"""
response = requests.get(url)
return response

def done(future,*args,**kwargs):
response = future.result()
print(response.status_code,response.content)

pool = ThreadPoolExecutor(7)
url_list = [
'http://www.***.com/wupeiqi',
'http://huaban.com/favorite/beauty/',
'http://www.bing.com',
'http://www.zhihu.com',
'http://www.sina.com',
'http://www.baidu.com',
'http://www.autohome.com.cn',
]
for url in url_list:
v = pool.submit(task,url)
v.add_done_callback(done)

pool.shutdown(wait=True)

多线程执行

性能相关异步IO_封装性能相关异步IO_异步io_02

"""
可以实现并发
但是,请求发送出去后和返回之前,中间时期进程空闲
编写方式:
- 直接返回处理
- 通过回调函数处理
"""

########### 编写方式一 ###########
"""
from concurrent.futures import ProcessPoolExecutor
import requests
import time

def task(url):
response = requests.get(url)
print(url,response)
# 写正则表达式


pool = ProcessPoolExecutor(7)
url_list = [
'http://www.***.com/wupeiqi',
'http://huaban.com/favorite/beauty/',
'http://www.bing.com',
'http://www.zhihu.com',
'http://www.sina.com',
'http://www.baidu.com',
'http://www.autohome.com.cn',
]
for url in url_list:
pool.submit(task,url)

pool.shutdown(wait=True)
"""

########### 编写方式二 ###########
from concurrent.futures import ProcessPoolExecutor
import requests
import time

def task(url):
response = requests.get(url)
return response

def done(future,*args,**kwargs):
response = future.result()
print(response.status_code,response.content)

pool = ProcessPoolExecutor(7)
url_list = [
'http://www.***.com/wupeiqi',
'http://huaban.com/favorite/beauty/',
'http://www.bing.com',
'http://www.zhihu.com',
'http://www.sina.com',
'http://www.baidu.com',
'http://www.autohome.com.cn',
]
for url in url_list:
v = pool.submit(task,url)
v.add_done_callback(done)

pool.shutdown(wait=True)

多进程

性能相关异步IO_封装性能相关异步IO_异步io_02

import asyncio

"""
@asyncio.coroutine
def task():
print('before...task......')
yield from asyncio.sleep(5) # 发送Http请求,支持TCP获取结果..
print('end...task......')


tasks = [task(), task()]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
"""


"""
import asyncio


@asyncio.coroutine
def task(host, url='/'):
print('start',host,url)
reader, writer = yield from asyncio.open_connection(host, 80)

request_header_content = "GET %s HTTP/1.0\r\nHost: %s\r\n\r\n" % (url, host,)
request_header_content = bytes(request_header_content, encoding='utf-8')

writer.write(request_header_content)
yield from writer.drain()
text = yield from reader.read()
print('end',host, url, text)
writer.close()

tasks = [
task('www.***.com', '/wupeiqi/'),
task('dig.chouti.com', '/pic/show?nid=4073644713430508&lid=10273091')
]

loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
"""

"""
import aiohttp
import asyncio


@asyncio.coroutine
def fetch_async(url):
print(url)
response = yield from aiohttp.request('GET', url)
print(url, response)
response.close()


tasks = [fetch_async('http://www.baidu.com/'), fetch_async('http://www.chouti.com/')]

event_loop = asyncio.get_event_loop()
results = event_loop.run_until_complete(asyncio.gather(*tasks))
event_loop.close()
"""

import asyncio
import requests


@asyncio.coroutine
def task(func, *args):
print(func,args)
loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, func, *args) # requests.get('http://www.***.com/wupeiqi/')
response = yield from future
print(response.url, response.content)


tasks = [
task(requests.get, 'http://www.***.com/wupeiqi/'),
task(requests.get, 'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')
]

loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

asyncio

性能相关异步IO_封装性能相关异步IO_异步io_02

"""
import gevent

import requests
from gevent import monkey
monkey.patch_all()


def task(method, url, req_kwargs):
print(method, url, req_kwargs)
response = requests.request(method=method, url=url, **req_kwargs)
print(response.url, response.content)

# ##### 发送请求 #####
# gevent.joinall([
# gevent.spawn(task, method='get', url='https://www.python.org/', req_kwargs={}),
# gevent.spawn(task, method='get', url='https://www.yahoo.com/', req_kwargs={}),
# gevent.spawn(task, method='get', url='https://github.com/', req_kwargs={}),
# ])
# ##### 发送请求(协程池控制最大协程数量) #####
from gevent.pool import Pool
pool = Pool(5)
gevent.joinall([
pool.spawn(task, method='get', url='https://www.python.org/', req_kwargs={}),
pool.spawn(task, method='get', url='https://www.yahoo.com/', req_kwargs={}),
pool.spawn(task, method='get', url='https://www.github.com/', req_kwargs={}),
])
"""
import grequests

import grequests


request_list = [
grequests.get('http://httpbin.org/delay/1', timeout=0.001),
grequests.get('http://fakedomain/'),
grequests.get('http://httpbin.org/status/500')
]


# ##### 执行并获取响应列表 #####
response_list = grequests.map(request_list,size=5)
print(response_list)

gevent

性能相关异步IO_封装性能相关异步IO_异步io_02

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor

def one_done(arg):
print(arg)

def all_done(arg):
print('done')
reactor.stop()

@defer.inlineCallbacks
def task(url):
res = getPage(bytes(url, encoding='utf8')) # 发送Http请求
res.addCallback(one_done)
yield res

url_list = [
'http://www.***.com',
'http://www.***.com',
'http://www.***.com',
'http://www.***.com',
]

defer_list = [] # [特殊,特殊,特殊(已经向url发送请求)]
for url in url_list:
v = task(url)
defer_list.append(v)

d = defer.DeferredList(defer_list)
d.addBoth(all_done)


reactor.run() # 死循环

twisted

性能相关异步IO_封装性能相关异步IO_异步io_02

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import HTTPRequest
from tornado import ioloop

COUNT = 0
def handle_response(response):
global COUNT
COUNT -= 1
if response.error:
print("Error:", response.error)
else:
print(response.body)
# 方法同twisted
# ioloop.IOLoop.current().stop()
if COUNT == 0:
ioloop.IOLoop.current().stop()

def func():
url_list = [
'http://www.baidu.com',
'http://www.bing.com',
]
global COUNT
COUNT = len(url_list)
for url in url_list:
print(url)
http_client = AsyncHTTPClient()
http_client.fetch(HTTPRequest(url), handle_response)


ioloop.IOLoop.current().add_callback(func)
ioloop.IOLoop.current().start() # 死循环

tornado


最牛逼自定义异步io框架

import socket
import select

# ########################## HTTP请求本质,阻塞 ##########################
"""
sk = socket.socket()
# 1.连接
sk.connect(('www.baidu.com',80,)) # IO阻塞
print('连接成功了...')

# 2. 连接成功发送消息
sk.send(b'GET / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\n')
# sk.send(b'POST / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\nk1=v1&k2=v2')

# 3. 等待着服务端响应
data = sk.recv(8096) # IO阻塞
print(data)

# 关闭连接
sk.close()
"""
# ########################## HTTP请求本质,非阻塞 ##########################
"""
sk = socket.socket()
sk.setblocking(False)
# 1.连接
try:
sk.connect(('www.baidu.com',80,)) # IO阻塞
print('连接成功了...')
except BlockingIOError as e:
print(e)
# 2. 连接成功发送消息
sk.send(b'GET / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\n')
# sk.send(b'POST / HTTP/1.0\r\nHost:www.baidu.com\r\n\r\nk1=v1&k2=v2')

# 3. 等待着服务端响应
data = sk.recv(8096) # IO阻塞
print(data)

# 关闭连接
sk.close()
"""

class HttpRequest:
def __init__(self,sk,host,callback):
self.socket = sk
self.host = host
self.callback = callback
def fileno(self):
return self.socket.fileno()

class HttpResponse:
def __init__(self,recv_data):
self.recv_data = recv_data
self.header_dict = {}
self.body = None

self.initialize()
def initialize(self):
headers, body = self.recv_data.split(b'\r\n\r\n', 1)
self.body = body
header_list = headers.split(b'\r\n')
for h in header_list:
h_str = str(h,encoding='utf-8')
v = h_str.split(':',1)
if len(v) == 2:
self.header_dict[v[0]] = v[1]


class AsyncRequest:
def __init__(self):
self.conn = []
self.connection = [] # 用于检测是否已经连接成功

def add_request(self,host,callback):
try:
sk = socket.socket()
sk.setblocking(0)
sk.connect((host,80,))
except BlockingIOError as e:
pass
request = HttpRequest(sk,host,callback)
self.conn.append(request)
self.connection.append(request)

def run(self):

while True:
rlist,wlist,elist = select.select(self.conn,self.connection,self.conn,0.05)
for w in wlist:
print(w.host,'连接成功...')
# 只要能循环到,表示socket和服务器端已经连接成功
tpl = "GET / HTTP/1.0\r\nHost:%s\r\n\r\n" %(w.host,)
w.socket.send(bytes(tpl,encoding='utf-8'))
self.connection.remove(w)
for r in rlist:
# r,是HttpRequest
recv_data = bytes()
while True:
try:
chunck = r.socket.recv(8096)
recv_data += chunck
except Exception as e:
break
response = HttpResponse(recv_data)
r.callback(response)
r.socket.close()
self.conn.remove(r)
if len(self.conn) == 0:
break

def f1(response):
print('保存到文件',response.header_dict)

def f2(response):
print('保存到数据库', response.header_dict)

url_list = [
{'host':'www.baidu.com','callback': f1},
{'host':'cn.bing.com','callback': f2},
{'host':'www.***.com','callback': f2},
]

req = AsyncRequest()
for item in url_list:
req.add_request(item['host'],item['callback'])

req.run()


异步io


- 傻等

response = requests.get(....)

- 机智

response = requests.get(....)

response = requests.get(....)

response = requests.get(....)

response = requests.get(....)


角色:使用者

- 多线程

- 多线程

- 协程(微线程) + 异步IO =》 1个线程发送N个Http请求

- asyncio

- 示例1:asyncio.sleep(5)

- 示例2:自己封装Http数据包

- 示例3:asyncio+aiohttp

aiohttp模块:封装Http数据包 pip3 install aiohttp

- 示例4:asyncio+requests

requests模块:封装Http数据包 pip3 install requests

- gevent,greenlet+异步IO

pip3 install greenlet

pip3 install gevent

- 示例1:gevent+requests

- 示例2:gevent(协程池,最多发多少个请求)+requests

- 示例3:gevent+requests => grequests

pip3 install grequests


- Twisted

pip3 install twisted

- Tornado

pip3 install tornado


=====> gevent > Twisted > Tornado > asyncio

角色:NB开发者


1. socket客户端、服务端

连接阻塞

setblocking(0): 无数据(连接无响应;数据未返回)就报错


2. IO多路复用

客户端:

try:

socket对象1.connet()

socket对象2.connet()

socket对象3.connet()

except Ex..

pass


while True:

r,w,e = select.select([socket对象1,socket对象2,socket对象3,],[socket对象1,socket对象2,socket对象3,],[],0.05)

r = [socket对象1,] # 表示有人给我发送数据

xx = socket对象1.recv()

w = [socket对象1,] # 表示我已经和别人创建连接成功:

socket对象1.send('"""GET /index HTTP/1.0\r\nHost: baidu.com\r\n\r\n"""')



3.

class Foo:


def fileno(self):

obj = socket()

return obj.fileno()


r,w,e = select.select([socket对象?,对象?,对象?,Foo()],[],[])

# 对象必须有: fileno方法,并返回一个文件描述符


========

a. select内部:对象.fileno()

b. Foo()内部封装socket文件描述符

IO多路复用:

r,w,e=while 监听多个socket对象 r,w,e接收变化,利用其特性可以开发

异步IO:

非阻塞的socket+IO多路复用





上一篇:Celery 分布式任务队列快速入门
下一篇:没有了
网友评论