为什么要了解RPC
在机器价格越来越便宜的时候,分布式环境的搭建显得异常容易。而随着互联网热度的暴涨,许多热门的服务无法在单机情况下完成任务,分布式的服务便是首选了。简单点来说,我们可以使用10台机器,然后上层搭建一个nginx负载均衡 or lvs负载均衡,这样就可以做到分流了。
然而新问题出现了,我们可以做到分流,但是当我们以集群的姿态去处理服务的时候,我们需要完成对服务的监管,同时各服务之间也有需要了解对方的状态,或者需要对方服务的支持。这样我们就需要做到机器与机器间的通信。
RPC(Remote Procedure Call)就是远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。当然咯,在实际应用场景中,我们可以自己使用socket写一个远程通信的服务,可是既然协议已经存在了,为什么不用了,这个协议是大家都遵守的,在多人合作开发环境下无疑是一个很有用处的东西。
RPC具体操作
可以看到RPC是一个C/S模式类型,客户端需要远程服务器的一个服务,于是发送消息告知服务器,我们需要调用那个服务接口,然后获取到返回数据。这样,这一次调用过程中,进行了两次网络IO。那么这样的一个过程,怎样去完成代码才比较优雅呢?在引用2中有这样一句话
每一个方法底层都是网络io,但是过程都是一样的,传递参数,得到结果,所以这里肯定可以优化,比如在java中可以使用动态代理机制。只需要使用一个服务端的接口类就,让Proxy动态生成一个代理类。服务端就是一个socket的常规服务器,可以使用线程池或者多路复用这样的技术,然后通过反射来调用实际类。还有注册服务的功能,所谓注册就是让接口和真正的实现类对接,最后就可以调用到实现类的方法。
可以去看看他的代码,确实比较优雅!
rpc帮助我们生成了客户端和服务端的消息和io等公共的代码,这样客户端可以直接调用代理类的方法,感知不到远程调用,服务端也只需要注册服务即可,也就是上面的io,反射,代理等等复杂的代码rpc框架都给我们生成了。
这句话我基本也同意。
总结一下:RPC是一个协议,该协议只是定义了方式,调用者可以通过调用该方法完成远程过程调用。因此要实现RPC,我们需要完成几个方面的内容
- 制定服务接口
- 完成服务端和客户端代码
- 编写接口实现
- 注册服务
my code
Server端
package _0403.rpc;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Service {
Selector selector = null;
ServerSocketChannel serverSocketChannel = null;
ExecutorService executor = Executors.newCachedThreadPool();
public void registerServer(Server serverClass, int port) throws IOException, NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
if(null == serverClass || port <= 0 || port >= 65535 ) {
System.out.println("parameter error, please check");
}
@SuppressWarnings("resource")
ServerSocket server = new ServerSocket(port);
while(true) {
final Socket socket = server.accept();
Processor processor = new Processor(socket, serverClass);
executor.execute(processor);
}
}
class Processor implements Runnable{
Socket socket;
Server serverClass;
public Processor(Socket socket, Server serverClass) {
this.socket = socket; this.serverClass = serverClass;
}
@Override
public void run() {
ObjectInputStream ois = null; ObjectOutputStream oos = null;
try{
try {
ois = new ObjectInputStream(socket.getInputStream());
oos = new ObjectOutputStream(socket.getOutputStream());
String name = ois.readUTF();
Class<?>[] parametertypes = (Class<?>[]) ois.readObject();
Object[] args = (Object[]) ois.readObject();
Method meth;
try {
meth = serverClass.getClass().getMethod(name, parametertypes);
Object obj = meth.invoke(serverClass, args);
oos.writeObject(obj);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
oos.writeObject(e);
}
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
} finally {
try {
ois.close();
oos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
} finally {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String args[]) {
Server server = new HelloServer();
Service service = new Service();
try {
service.registerServer(server, 8888);
} catch (IOException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (SecurityException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
客户端
package _0403.rpc;
import java.io.IOException;
public class Client {
public static void main(String[] args) throws IOException {
Server h = RPCClientHolder.referClient(Server.class, "127.0.0.1", 8888);
String re = h.getInfo();
System.out.println(re);
}
}
辅助代码
package _0403.rpc;
public interface Server {
String getInfo();
}
package _0403.rpc;
public class HelloServer implements Server {
@Override
public String getInfo() {
return "nihao";
}
}
package _0403.rpc;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;
public class RPCClientHolder {
@SuppressWarnings("unchecked")
public static <T> T referClient(final Class<T> interfaceclass, final String host, final int port) {
if(null == interfaceclass) return null;
if(!interfaceclass.isInterface()) return null;
if(null == host || host.length() <6) return null;
if(port > 65535 || port < 0) return null;
System.out.println("parameter checked");
return (T) Proxy.newProxyInstance(interfaceclass.getClassLoader(), new Class<?>[] {interfaceclass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
Socket socket = new Socket(host, port);
try {
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
try {
oos.writeUTF(method.getName());
oos.writeObject(method.getParameterTypes());
oos.writeObject(args);
ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
try {
Object re = ois.readObject();
if(re instanceof Throwable) { throw (Throwable)re; }
return re;
} finally {
ois.close();
}
} finally {
oos.close();
}
} finally {
socket.close();
}
}
});
}
}
后记
理解有可能有偏差,以后如果发现问题会及时修正
PS:在实现代码的时候发现一个问题,如果我用线程池去处理每个select的key值,会出现null exception报错。目前还不了解具体问题是什么。这个问题先记录在这。
—————————————————————————————————— 分隔线
前面的理解确实有偏差,简单的将RPC定义为一种客户端服务器模式了,所以原来给出的代码只是可以通信的代码。
代码已经修改过来了。可以看到客户端代码非常简单的,服务端完成了大部分代码,RPCClientHolder在实际开发过程中应该也是服务端开发人员编写,打包后供客户端调用的
整个服务基本上用到了反射这种java自带的技术来实现,RPCClientHolder类在工作中起到代理的作用。
————————————————————
参考文献
【1】
https://baike.baidu.com/item/%E8%BF%9C%E7%A8%8B%E8%BF%87%E7%A8%8B%E8%B0%83%E7%94%A8%E5%8D%8F%E8%AE%AE/6893245?fr=aladdin&fromid=609861&fromtitle=RPC
【2】https://blog.csdn.net/u010900754/article/details/78081428