当前位置 : 主页 > 编程语言 > python >

用twisted 发送请求

来源:互联网 收集:自由互联 发布时间:2022-06-15
# coding: utf-8 # @Time : 2022-05-18 13:01 # @Author : AngDH from twisted.internet import reactor from twisted.web.client import Agent from twisted.web.http_headers import Headers # 相当于客户端角色 agent = Agent(reactor) defered =


# coding: utf-8
# @Time : 2022-05-18 13:01
# @Author : AngDH
from twisted.internet import reactor
from twisted.web.client import Agent
from twisted.web.http_headers import Headers

# 相当于客户端角色
agent = Agent(reactor)

defered = agent.request(
b'GET',
b'http://www.baidu.com/s?wd=python',
Headers({"User-Agent": ["python"]}),
None)


def successCallback(response):
print('Response received:', response.code)


defered.addCallback(successCallback)


def errorCallback(error):
print("error:", str(error))


defered.addErrback(errorCallback)


def callbackShutdown(ignored):
reactor.stop()


defered.addBoth(callbackShutdown)

reactor.run()
# coding: utf-8
# @Time : 2022-05-18 13:01
# @Author : AngDH
from twisted.internet import reactor
from twisted.internet.defer import succeed
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer
from zope.interface import implementer


@implementer(IBodyProducer)
class BytesProducer(object):

def __init__(self, body):
self.body = body
self.length = len(body)

def startProducing(self, consumer):
consumer.write(self.body)
return succeed(None)

def pauseProducting(self):
pass

def stopProducing(self):
pass


# 相当于客户端角色
agent = Agent(reactor)

defered = agent.request(
b'POST',
b'http://www.baidu.com/s?wd=python',
Headers({"User-Agent": ["python"]}),
BytesProducer('请求体'.encode()))


def successCallback(response):
print('Response received:', response.code)


defered.addCallback(successCallback)


def errorCallback(error):
print("error:", str(error))


defered.addErrback(errorCallback)


def callbackShutdown(ignored):
reactor.stop()


defered.addBoth(callbackShutdown)

reactor.run()
# coding: utf-8
# @Time : 2022-05-18 13:01
# @Author : AngDH
from twisted.internet import reactor
from twisted.internet.defer import DeferredList
from twisted.web.client import Agent
from twisted.web.http_headers import Headers

# 相当于客户端角色
agent = Agent(reactor)


def successCallback(response):
print('Response received:', response.code)


def errorCallback(error):
print("error:", str(error))


def callbackShutdown(ignored):
reactor.stop()


_ = []

for url in [b"http://www.baidu.com",
b"http://www.taobao.com",
b"http://www.jd.com",
b"http://www.tmall.com",
b"http://www.tencent.com",
b"http://www.douban.com",
]:
d = agent.request(
b'GET',
url,
Headers({"User-Agent": ["python"]}),
None)
d.addCallback(successCallback)
d.addErrback(errorCallback)
_.append(d)

# 统一管理 多个defered对象
dl = DeferredList(_)
dl.addBoth(callbackShutdown)

reactor.run()





上一篇:协程的嵌套
下一篇:没有了
网友评论