- json-rpc 源码阅读
- jsonrpcclient的实现
- jsonrpcserver的实现
- 小结
- 小技巧
json-rpc 源码阅读
JSON-RPC是一个无状态且轻量级的远程过程调用(RPC)协议。JSON-RPC应用很广泛,比如以太坊的API。JSON-RPC的python实现较多,我选择了Exploding Labs 提供的python版本。主要是其它库都比较古老,而e-labs的实现采用最新版本python,支持类型系统,还有一些函数式编程的范式,代码也很简洁,值得学习。
e-labs的JSON-RPC分成客户端和服务端两个库,分别是jsonrpcclient和jsonrpcserver, 代码版本如下表:
- jsonrpcclient的实现
- jsonrpcserver的实现
- 小结
- 小技巧
# request-schema.json { "$schema": "http://json-schema.org/draft-04/schema#", "description": "A JSON RPC 2.0 request", "oneOf": [ { "description": "An individual request", "$ref": "#/definitions/request" }, { "description": "An array of requests", "type": "array", "items": { "$ref": "#/definitions/request" }, "minItems": 1 } ], "definitions": { "request": { "type": "object", "required": [ "jsonrpc", "method" ], "properties": { "jsonrpc": { "enum": [ "2.0" ] }, "method": { "type": "string" }, "id": { "type": [ "string", "number", "null" ], "note": [ "While allowed, null should be avoided: http://www.jsonrpc.org/specification#id1", "While allowed, a number with a fractional part should be avoided: http://www.jsonrpc.org/specification#id2" ] }, "params": { "type": [ "array", "object" ] } }, "additionalProperties": false } } }
- json-rpc请求可以是单个的request对象,也是是批量的request对象数组
- 每个request对象需要符合:
- 必填字段jsonrpc,值枚举类型。目前2.0,其实就是版本号。(之前有1.0版本)
- 必填字段method, 字符串类型。远程函数的名称。
- id字段,支持字符串,数字或者空。为空表示通知无需回应(result)。id确保响应可以一一对应到请求上。
- params字段,支持数组或者字典。
- jsonrpc字段,值为2.0
- result字段,值为调用结果
- error字段,值为异常信息,包括code,message和data三个字段,规范定义了详细的错误清单。
- id同请求的id
- result和error二选一
from jsonrpcclient import request, parse, Ok import logging import requests response = requests.post("http://localhost:5000/", json=request("ping")) parsed = parse(response.json()) if isinstance(parsed, Ok): print(parsed.result) else: logging.error(parsed.message)
- jsonrpcclient只是封装请求request和响应Ok,数据请求的发送由不同协议提供,这里使用requests,另外还有aiohttp的实现等。
- resquest函数封装请求,parse解析响应
- 正常的结果展示result信息,错误的结果展示message信息
request代码很简单, 封装请求成符合JSON-RPC规范的字符串:
# requests.py def request_pure( id_generator: Iterator[Any], method: str, params: Union[Dict[str, Any], Tuple[Any, ...]], id: Any, ) -> Dict[str, Any]: return { "jsonrpc": "2.0", "method": method, **( {"params": list(params) if isinstance(params, tuple) else params} if params else {} ), "id": id if id is not NOID else next(id_generator), } def request_impure( id_generator: Iterator[Any], method: str, params: Union[Dict[str, Any], Tuple[Any, ...], None] = None, id: Any = NOID, ) -> Dict[str, Any]: return request_pure( id_generator or id_generators.decimal(), method, params or (), id ) request_natural = partial(request_impure, id_generators.decimal()) ... request = request_natural
$ curl -X POST http://localhost:5001 -d '{"jsonrpc": "2.0", "method": "ping", "params": {}, "id": 1}'
# response.py class Ok(NamedTuple): result: Any id: Any def __repr__(self) -> str: return f"Ok(result={self.result!r}, id={self.id!r})" class Error(NamedTuple): code: int message: str data: Any id: Any def __repr__(self) -> str: return f"Error(code={self.code!r}, message={self.message!r}, data={self.data!r}, id={self.id!r})" Response = Union[Ok, Error]
def to_result(response: Dict[str, Any]) -> Response: return ( Ok(response["result"], response["id"]) if "result" in response else Error( response["error"]["code"], response["error"]["message"], response["error"].get("data"), response["id"], ) ) def parse(response: Deserialized) -> Union[Response, Iterable[Response]]: return ( map(to_result, response) if isinstance(response, list) else to_result(response) )
parse_json = compose(parse, json.loads)
# flask_server.py from flask import Flask, Response, request from jsonrpcserver import method, Result, Success, dispatch app = Flask(__name__) @method def ping() -> Result: return Success("pong") @app.route("/", methods=["POST"]) def index(): return Response( dispatch(request.get_data().decode()), content_type="application/json" ) if __name__ == "__main__": app.run()
- 使用method装饰ping函数,使它支持rpc调用,ping函数返回的是一个特点的Result数据结构
- 所有rpc调用的http-url都是根目录,服务使用dispatch调度rpc请求
# methods.py Method = Callable[..., Result] Methods = Dict[str, Method] global_methods = dict() def method( f: Optional[Method] = None, name: Optional[str] = None ) -> Callable[..., Any]: """A decorator to add a function into jsonrpcserver's internal global_methods dict. The global_methods dict will be used by default unless a methods argument is passed to `dispatch`. Functions can be renamed by passing a name argument: @method(name=bar) def foo(): ... """ def decorator(func: Method) -> Method: nonlocal name global_methods[name or func.__name__] = func return func return decorator(f) if callable(f) else cast(Method, decorator)
- 将所有的rpc函数都封装到global_methods字典中
- 函数需要返回Result类型
# main.py def dispatch_to_response( request: str, methods: Optional[Methods] = None, *, context: Any = NOCONTEXT, deserializer: Callable[[str], Deserialized] = json.loads, validator: Callable[[Deserialized], Deserialized] = default_validator, post_process: Callable[[Response], Any] = identity, ) -> Union[Response, List[Response], None]: """Takes a JSON-RPC request string and dispatches it to method(s), giving Response namedtuple(s) or None. This is a public wrapper around dispatch_to_response_pure, adding globals and default values to be nicer for end users. Args: request: The JSON-RPC request string. methods: Dictionary of methods that can be called - mapping of function names to functions. If not passed, uses the internal global_methods dict which is populated with the @method decorator. context: If given, will be passed as the first argument to methods. deserializer: Function that deserializes the request string. validator: Function that validates the JSON-RPC request. The function should raise an exception if the request is invalid. To disable validation, pass lambda _: None. post_process: Function that will be applied to Responses. Returns: A Response, list of Responses or None. Examples: >>> dispatch('{"jsonrpc": "2.0", "method": "ping", "id": 1}') '{"jsonrpc": "2.0", "result": "pong", "id": 1}' """ return dispatch_to_response_pure( deserializer=deserializer, validator=validator, post_process=post_process, context=context, methods=global_methods if methods is None else methods, request=request, )
- request 请求的函数名称
- methods 可供调用的函数集合,默认就是之前rpc装饰器中存储的global_methods
- deserializer 请求的反序列化函数,validator请求验证器
- post_process响应处理函数
def to_serializable_one(response: ResponseType) -> Union[Deserialized, None]: return ( serialize_error(response._error) if isinstance(response, Left) else serialize_success(response._value) )
def dispatch_request( methods: Methods, context: Any, request: Request ) -> Tuple[Request, Result]: """Get the method, validates the arguments and calls the method. Returns: A tuple containing the Result of the method, along with the original Request. We need the ids from the original request to remove notifications before responding, and create a Response. """ return ( request, get_method(methods, request.method) .bind(partial(validate_args, request, context)) .bind(partial(call, request, context)), )
- 使用get_method查找rpc响应函数
- 使用validate_args验证rpc请求
- 使用call执行rpc调用
- 3个步骤依次执行,前者的返回值会作为后缀的参数
def call(request: Request, context: Any, method: Method) -> Result: """Call the method. Handles any exceptions raised in the method, being sure to return an Error response. Returns: A Result. """ try: result = method(*extract_args(request, context), **extract_kwargs(request)) # validate_result raises AssertionError if the return value is not a valid # Result, which should respond with Internal Error because its a problem in the # method. validate_result(result) # Raising JsonRpcError inside the method is an alternative way of returning an error # response. except JsonRpcError as exc: return Left(ErrorResult(code=exc.code, message=exc.message, data=exc.data)) # Any other uncaught exception inside method - internal error. except Exception as exc: logger.exception(exc) return Left(InternalErrorResult(str(exc))) return result
- 使用args和kwargs动态执行rpc函数,并将结果进行返回
- 捕获异常,返回标准错误
# response.py class SuccessResult(NamedTuple): result: Any = None class ErrorResult(NamedTuple): code: int message: str data: Any = NODATA # The spec says this value may be omitted # Union of the two valid result types Result = Either[ErrorResult, SuccessResult] def Success(*args: Any, **kwargs: Any) -> Either[ErrorResult, SuccessResult]: return Right(SuccessResult(*args, **kwargs)) def Error(*args: Any, **kwargs: Any) -> Either[ErrorResult, SuccessResult]: return Left(ErrorResult(*args, **kwargs))
SuccessResult和ErrorResult是python的两个标准对象;Result是oslash中定义的联合对象,在ErrorResult, SuccessResult中二选一,有些类似rust中的Option;Right封装了正确的结果,Left封装了错误的结果。
我们一起学习了JSON-RPC规范,并且了解了Exploding Labs如何使用 现代python 实现该规范,也接触了一些函数式编程的方式。
start = 0 def gen1(): start +=1 return count # 调用 id = gen1()
def gen2(): start = 0 def incr(): start +=1 return count return incr gen = gen2() # 调用 id = gen()
def hexadecimal(start: int = 1) -> Iterator[str]: """ Incremental hexadecimal numbers. e.g. 1, 2, 3, .. 9, a, b, etc. Args: start: The first value to start with. """ while True: yield "%x" % start start += 1
以上就是python json-rpc 规范源码阅读的详细内容,更多关于python json-rpc 规范的资料请关注自由互联其它相关文章!