当前位置 : 主页 > 大数据 > 区块链 >

what is RPC?

来源:互联网 收集:自由互联 发布时间:2021-06-22
为什么要了解RPC 在机器价格越来越便宜的时候,分布式环境的搭建显得异常容易。而随着互联网热度的暴涨,许多热门的服务无法在单机情况下完成任务,分布式的服务便是首选了。简

为什么要了解RPC


        在机器价格越来越便宜的时候,分布式环境的搭建显得异常容易。而随着互联网热度的暴涨,许多热门的服务无法在单机情况下完成任务,分布式的服务便是首选了。简单点来说,我们可以使用10台机器,然后上层搭建一个nginx负载均衡 or lvs负载均衡,这样就可以做到分流了。
        然而新问题出现了,我们可以做到分流,但是当我们以集群的姿态去处理服务的时候,我们需要完成对服务的监管,同时各服务之间也有需要了解对方的状态,或者需要对方服务的支持。这样我们就需要做到机器与机器间的通信。
        RPC(Remote Procedure Call)就是远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。当然咯,在实际应用场景中,我们可以自己使用socket写一个远程通信的服务,可是既然协议已经存在了,为什么不用了,这个协议是大家都遵守的,在多人合作开发环境下无疑是一个很有用处的东西。

RPC具体操作

可以看到RPC是一个C/S模式类型,客户端需要远程服务器的一个服务,于是发送消息告知服务器,我们需要调用那个服务接口,然后获取到返回数据。这样,这一次调用过程中,进行了两次网络IO。那么这样的一个过程,怎样去完成代码才比较优雅呢?在引用2中有这样一句话

每一个方法底层都是网络io,但是过程都是一样的,传递参数,得到结果,所以这里肯定可以优化,比如在java中可以使用动态代理机制。只需要使用一个服务端的接口类就,让Proxy动态生成一个代理类。服务端就是一个socket的常规服务器,可以使用线程池或者多路复用这样的技术,然后通过反射来调用实际类。还有注册服务的功能,所谓注册就是让接口和真正的实现类对接,最后就可以调用到实现类的方法。

可以去看看他的代码,确实比较优雅!

rpc帮助我们生成了客户端和服务端的消息和io等公共的代码,这样客户端可以直接调用代理类的方法,感知不到远程调用,服务端也只需要注册服务即可,也就是上面的io,反射,代理等等复杂的代码rpc框架都给我们生成了。

这句话我基本也同意。

总结一下:RPC是一个协议,该协议只是定义了方式,调用者可以通过调用该方法完成远程过程调用。因此要实现RPC,我们需要完成几个方面的内容

  1. 制定服务接口
  2. 完成服务端和客户端代码
  3. 编写接口实现
  4. 注册服务

my code

Server端

package _0403.rpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Service {

    Selector selector = null;   

    ServerSocketChannel serverSocketChannel = null;

    ExecutorService executor = Executors.newCachedThreadPool();

    public void registerServer(Server serverClass, int port) throws IOException, NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {

        if(null == serverClass || port <= 0 || port >= 65535 ) {
            System.out.println("parameter error, please check");
        }
        @SuppressWarnings("resource")
        ServerSocket server = new ServerSocket(port);
        while(true) {

            final Socket socket = server.accept();
            Processor processor = new Processor(socket, serverClass);
            executor.execute(processor);

        }
    }

    class Processor implements Runnable{

        Socket socket;

        Server serverClass;

        public Processor(Socket socket, Server serverClass) {

            this.socket = socket; this.serverClass = serverClass;
        }

        @Override
        public void run() {
            ObjectInputStream ois = null; ObjectOutputStream oos = null;
            try{
                try {
                    ois = new ObjectInputStream(socket.getInputStream());
                    oos = new ObjectOutputStream(socket.getOutputStream());
                    String name = ois.readUTF();
                    Class<?>[] parametertypes = (Class<?>[]) ois.readObject();
                    Object[] args = (Object[]) ois.readObject();
                    Method meth;
                    try {
                        meth = serverClass.getClass().getMethod(name, parametertypes);
                        Object obj = meth.invoke(serverClass, args);
                        oos.writeObject(obj);
                    } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
                        oos.writeObject(e);
                    } 

                } catch (IOException | ClassNotFoundException e) {
                    e.printStackTrace();
                } finally {
                    try {
                        ois.close();
                        oos.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }

            } finally {

                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String args[]) {

        Server server = new HelloServer();
        Service service = new Service();
        try {
            service.registerServer(server, 8888);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        } catch (SecurityException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (IllegalArgumentException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        }
    }
}

客户端

package _0403.rpc;

import java.io.IOException;

public class Client {

    public static void main(String[] args) throws IOException {

        Server h = RPCClientHolder.referClient(Server.class, "127.0.0.1", 8888);
        String re = h.getInfo();
        System.out.println(re);
    }
}

辅助代码

package _0403.rpc;

public interface Server {

    String getInfo();
}

package _0403.rpc;

public class HelloServer implements Server {

    @Override
    public String getInfo() {

        return "nihao";
    }

}

package _0403.rpc;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

public class RPCClientHolder {

    @SuppressWarnings("unchecked")
    public static <T> T referClient(final Class<T> interfaceclass, final String host, final int port) {

        if(null == interfaceclass) return null;
        if(!interfaceclass.isInterface()) return null;
        if(null == host || host.length() <6) return null;
        if(port > 65535 || port < 0) return null;
        System.out.println("parameter checked");
        return (T) Proxy.newProxyInstance(interfaceclass.getClassLoader(), new Class<?>[] {interfaceclass}, new InvocationHandler() {

            @Override
            public Object invoke(Object proxy, Method method, Object[] args)
                    throws Throwable {

                Socket socket = new Socket(host, port);
                try {

                    ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                    try {

                        oos.writeUTF(method.getName());
                        oos.writeObject(method.getParameterTypes());
                        oos.writeObject(args);
                        ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
                        try {

                            Object re = ois.readObject();
                            if(re instanceof Throwable) { throw (Throwable)re; }
                            return re;
                        } finally {

                            ois.close();
                        }

                    } finally {
                        oos.close();
                    }

                } finally {
                    socket.close();
                }
            }

        });
    }
}

后记

理解有可能有偏差,以后如果发现问题会及时修正

PS:在实现代码的时候发现一个问题,如果我用线程池去处理每个select的key值,会出现null exception报错。目前还不了解具体问题是什么。这个问题先记录在这。

—————————————————————————————————— 分隔线

        前面的理解确实有偏差,简单的将RPC定义为一种客户端服务器模式了,所以原来给出的代码只是可以通信的代码。
        代码已经修改过来了。可以看到客户端代码非常简单的,服务端完成了大部分代码,RPCClientHolder在实际开发过程中应该也是服务端开发人员编写,打包后供客户端调用的
        整个服务基本上用到了反射这种java自带的技术来实现,RPCClientHolder类在工作中起到代理的作用。

————————————————————

参考文献

【1】
https://baike.baidu.com/item/%E8%BF%9C%E7%A8%8B%E8%BF%87%E7%A8%8B%E8%B0%83%E7%94%A8%E5%8D%8F%E8%AE%AE/6893245?fr=aladdin&fromid=609861&fromtitle=RPC
【2】https://blog.csdn.net/u010900754/article/details/78081428

网友评论