当前位置 : 主页 > 编程语言 > java >

手写基于Java RMI的RPC框架

来源:互联网 收集:自由互联 发布时间:2022-10-26
留给读者 其中最大的区别就是ZooKeeper注册中心,注册中心可以有读写监听器,这是一个优势,可以用来实现订阅通知,也能做数据的同步,甚至可以做基于读写分离的RPC框架,而且它是

留给读者

其中最大的区别就是ZooKeeper注册中心,注册中心可以有读写监听器,这是一个优势,可以用来实现订阅通知,也能做数据的同步,甚至可以做基于读写分离的RPC框架,而且它是基于一种树结构key-value的,它可以实现很多自己需要的,只不过不想Nacos一样,直接提供API注册ip地址和端口,以及对应的类,之后我会尽可能扩展一个新的注册中心到rpc-netty-framework中,满足各种不同的需求而做出改变! 使用Zookeeper作为注册中心,RMI作为连接技术,手写RPC框架

1.框架结构

● 连接器:提供默认链接信息配置和提供连接 ● 注册器:提供注册服务和获取代理对象(没有具体的注册信息) ● RPC静态工厂:创建注册器、获取连接、注册服务和获取代理对象(已经通过静态初始化注册信息)

2.依赖

<dependencies> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <!--具体以zookeeper的版本为准--> <version>3.4.11</version> </dependency> </dependencies>

3.项目

3.1 连接器

package com.fyp.rpc.connection; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; /** * @Auther: fyp * @Date: 2022/1/6 * @Description: 提供Zookeeper连接的自定义类型 * @Package: com.fyp.rpc.connection * @Version: 1.0 */ public class ZkConnection { private String zkServer; private int sessionTimeout; public ZkConnection() { super(); this.zkServer = "localhost:2181"; this.sessionTimeout = 10000; } public ZkConnection(String zkServer, int sessionTimeout) { this.zkServer = zkServer; this.sessionTimeout = sessionTimeout; } public ZooKeeper getConnection() throws IOException { return new ZooKeeper(zkServer, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { } }); } }

3.2 注册器

package com.fyp.rpc.registry; import com.fyp.rpc.connection.ZkConnection; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.rmi.Naming; import java.rmi.NotBoundException; import java.rmi.Remote; import java.util.List; /** * @Auther: fyp * @Date: 2022/1/6 * @Description: RPC注册器 * @Package: com.fyp.rpc.registry * @Version: 1.0 */ public class FypRpcRegistry { private ZkConnection connection; private String ip; private int port; /** * 注册服务的方法 * @param serviceInterface 服务接口对象 如 : com.fyp.service.UserService.class * @param serviceObject 服务实现类型的对象 如: new com.fyp.service.impl.UserServiceImpl * @throws Exception 抛出异常,代表注册失败 */ public void registerService(Class<? extends Remote> serviceInterface, Remote serviceObject) throws IOException, KeeperException, InterruptedException { // rmi = rmi://ip:port/com.fyp.service.UsrService String rmi = "rmi://" + ip + ":" + port + "/" + serviceInterface.getName(); // 拼接一个有规则的zk存储节点命名 String path = "/fyp/rpc/" + serviceInterface.getName(); List<String> children = connection.getConnection().getChildren("/fyp/rpc", false); if(!children.contains(serviceInterface.getName())) { Stat stat = new Stat(); connection.getConnection().getData(path, false, stat); connection.getConnection().delete(path, stat.getCversion()); } connection.getConnection().create(path,rmi.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Naming.rebind(rmi, serviceObject); } /** * 根据服务接口类型,访问zk,获取RMI的远程代理对象 * 1. 拼接一个zk中的节点名称 * 2. 访问zk,查询节点中存储的数据 * 3. 根据查询的结果,创建一个代理对象 * @return */ public <T extends Remote> T getServiceProxy(Class<T> serviceInterface) throws IOException, NotBoundException, KeeperException, InterruptedException { String path = "/fyp/rpc/" + serviceInterface.getName(); byte[] datas = connection.getConnection().getData(path, false, null); String rmi = new String(datas); Object obj = Naming.lookup(rmi); return (T) obj; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public ZkConnection getConnection() { return connection; } public void setConnection(ZkConnection connection) { this.connection = connection; } }

3.3 RPC静态工厂

package com.fyp.rpc; import com.fyp.rpc.connection.ZkConnection; import com.fyp.rpc.registry.FypRpcRegistry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import java.io.IOException; import java.io.InputStream; import java.rmi.NotBoundException; import java.rmi.Remote; import java.rmi.registry.LocateRegistry; import java.util.List; import java.util.Properties; /** * @Auther: fyp * @Date: 2022/1/7 * @Description: RPC工厂 * @Package: com.fyp.rpc * @Version: 1.0 */ public class FypRpcFactory { private static final Properties config = new Properties(); private static final ZkConnection connection; private static final FypRpcRegistry registry; /** * 初始化过程、 * 固定逻辑,在classpath下,提供配置文件,命名为;fyp-rpc.properties * registry.ip=服务端IP地址,默认为localhost * registry.port=服务端端口号,默认为9090 * zk.server=Zookeeper访问地址,默认为localhost:2181 * zk.sessionTimeout=Zookeeper连接回话超时,默认为10000 * */ static { try { InputStream input = FypRpcRegistry.class.getClassLoader().getResourceAsStream("fyp-rpc.properties"); config.load(input); String serverIp = config.getProperty("registry.ip") == null ? "localhost" : config.getProperty("registry.ip"); int serverPort= config.getProperty("registry.port") == null ? 9090 : Integer.parseInt(config.getProperty("registry.port")); String zkServe = config.getProperty("zk.server") == null ? "localhost:2181" : config.getProperty("zk.server"); int zkSessionTimeout = config.getProperty("zk.sessionTimeout") == null ? 10000 : Integer.parseInt(config.getProperty("zk.sessionTimeout")); connection = new ZkConnection(zkServe,zkSessionTimeout); registry = new FypRpcRegistry(); registry.setIp(serverIp); registry.setPort(serverPort); registry.setConnection(connection); LocateRegistry.createRegistry(serverPort); List<String> children = connection.getConnection().getChildren("/", false); if(!children.contains("fyp")) { connection.getConnection().create("/fyp",null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } List<String> fypChildren = connection.getConnection().getChildren("/fyp", false); if(!fypChildren.contains("rpc")) { connection.getConnection().create("/fyp/rpc",null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (Exception e) { e.printStackTrace(); // 初始化发生异常,中断虚拟机 throw new ExceptionInInitializerError(e); } } public static void registerService(Class<? extends Remote> serviceInterface, Remote serviceObject) throws IOException, InterruptedException, KeeperException { registry.registerService(serviceInterface, serviceObject); } public static <T extends Remote> T getServiceProxy(Class<T> serviceInterface) throws IOException, KeeperException, InterruptedException, NotBoundException { return registry.getServiceProxy(serviceInterface); } }

总结: 说白了,RPC框架已经被实现了,最大众的Dubbo大家应该都用过了,这篇文章就是基于RMI技术实现的简易版dubbo,后续会给出优化————服务自动发现注册、服务容错和负载均衡,想了解的的不妨加个收藏。 最后,如果有需要先了解Dubbo再来学习RPC框架的,可以参考学习下面这篇文章。 《Linux环境下Dubbo环境搭建及启动》

【文章原创作者:阿里云代理 http://www.558idc.com/aliyun.html处的文章,转载请说明出处】
上一篇:各种线程状态的打断方法
下一篇:没有了
网友评论