目录 1. JSCH使用方法 2. JSCH工具类 3. 创建连接池 4. 改造shellUtil 5. 添加配置 6. 线程安全问题解决 1. JSCH使用方法 jsch使用方法 2. JSCH工具类 JSCH工具类 3. 创建连接池 ConnectionPool.java @Slf4jp
目录
- 1. JSCH使用方法
- 2. JSCH工具类
- 3. 创建连接池
- 4. 改造shellUtil
- 5. 添加配置
- 6. 线程安全问题解决
1. JSCH使用方法
jsch使用方法
2. JSCH工具类
JSCH工具类
3. 创建连接池
ConnectionPool.java
@Slf4j public class ConnectionPool { private String strictHostKeyChecking; private Integer timeout; /** * ip地址 */ private String ip = ""; /** * 端口号 */ private Integer port = 22; /** * 用户名 */ private String username = ""; /** * 密码 */ private String password = ""; /** * 每次扩容增加几个连接 */ private int incrementalConnections = 2; /** * 最大连接数 */ private int maxConnections = 10; /** * 最大空闲连接 */ private int maxIdle = 4; /** * 最小空闲连接 */ private int minIdel = 2; private Vector<PooledConnection> connections = null; @PostConstruct private void init() { createPool(); } /** * 构造方法 * * @param strictHostKeyChecking 连接模式 * @param timeout 超时时间 */ public ConnectionPool(String strictHostKeyChecking, Integer timeout) { this.strictHostKeyChecking = strictHostKeyChecking; this.timeout = timeout; } /** * 构造方法 * * @param strictHostKeyChecking 连接模式 * @param timeout 超时时间 * @param incrementalConnections 增量大小 */ public ConnectionPool(String strictHostKeyChecking, Integer timeout, int incrementalConnections) { this.strictHostKeyChecking = strictHostKeyChecking; this.timeout = timeout; this.incrementalConnections = incrementalConnections; } /** * 构造方法 * * @param strictHostKeyChecking 连接模式 * @param timeout 超时时间 * @param incrementalConnections 增量大小 * @param maxConnections 连接池最大连接数 */ public ConnectionPool(String strictHostKeyChecking, Integer timeout, int incrementalConnections, int maxConnections) { this.strictHostKeyChecking = strictHostKeyChecking; this.timeout = timeout; this.incrementalConnections = incrementalConnections; this.maxConnections = maxConnections; } /** * 创建连接池,判断连接池是否创建,如果连接池没有创建则创建连接池 */ public synchronized void createPool() { if (Objects.nonNull(connections)) { return; } connections = new Vector<>(); log.info("create shell connectionPool success!"); } /** * 创建指定数量的连接放入连接池中 * * @param numConnections 创建数量 * @throws JSchException 建立远程连接异常 */ private void createConnections(int numConnections) throws JSchException { for (int x = 0; x < numConnections; x++) { // 判断是否已达连接池最大连接,如果到达最大连接数据则不再创建连接 if (this.maxConnections > 0 && this.connections.size() >= this.maxConnections) { break; } //在连接池中新增一个连接 try { connections.addElement(new PooledConnection(newConnection(), ip)); } catch (JSchException e) { log.error("create shell connection failed {}", e.getMessage()); throw new JSchException(); } log.info("Session connected!"); } } /** * 新一个连接session * * @return 创建的session * @throws JSchException 远程连接异常 */ private Session newConnection() throws JSchException { // 创建一个session JSch jsch = new JSch(); Session session = jsch.getSession(username, ip, port); session.setPassword(password); Properties sshConfig = new Properties(); sshConfig.put("StrictHostKeyChecking", strictHostKeyChecking); session.setConfig(sshConfig); session.connect(timeout); session.setServerAliveInterval(1800); return session; } /** * 获取一个可用session * * @param ip ip地址 * @param port 端口号 * @param username 用户名 * @param password 密码 * @return 可用的session * @throws JSchException 远程连接异常 */ public synchronized Session getConnection(String ip, Integer port, String username, String password) throws JSchException { this.ip = ip; this.port = port; this.username = username; this.password = password; // 连接池还没创建,则返回 null if (Objects.isNull(connections)) { return null; } // 获得一个可用的数据库连接 Session session = getFreeConnection(); // 假如目前没有可以使用的连接,即所有的连接都在使用中,等一会重试 while (Objects.isNull(session)) { wait(250); session = getFreeConnection(); } return session; } /** * 获取一个可用session * * @return 返回可用session * @throws JSchException 远程连接异常 */ private Session getFreeConnection() throws JSchException { Session session = findFreeConnection(); // 如果没有可用连接,则创建连接, if (Objects.isNull(session)) { createConnections(incrementalConnections); session = findFreeConnection(); if (Objects.isNull(session)) { return null; } } return session; } /** * 查找可用连接 * * @return 返回可用连接 */ private Session findFreeConnection() { Session session = null; PooledConnection conn; Enumeration<PooledConnection> enumerate = connections.elements(); // 遍历所有的对象,看是否有可用的连接 while (enumerate.hasMoreElements()) { conn = enumerate.nextElement(); if (!ip.equals(conn.getTag())) { continue; } if (!conn.isBusy()) { session = conn.getSession(); conn.setBusy(true); if (!testConnection(session)) { try { session = newConnection(); } catch (JSchException e) { log.error("create shell connection failed {}", e.getMessage()); return null; } conn.setSession(session); } break; } } return session; } /** * 测试连接是否可用 * * @param session 需要测试的session * @return 是否可用 */ private boolean testConnection(Session session) { boolean connected = session.isConnected(); if (!connected) { closeConnection(session); return false; } return true; } /** * 将session放回连接池中 * * @param session 需要放回连接池中的session */ public synchronized void returnConnection(Session session) { // 确保连接池存在,假如连接没有创建(不存在),直接返回 if (Objects.isNull(connections)) { log.error("ConnectionPool does not exist"); return; } PooledConnection conn; Enumeration<PooledConnection> enumerate = connections.elements(); // 遍历连接池中的所有连接,找到这个要返回的连接对象,将状态设置为空闲 while (enumerate.hasMoreElements()) { conn = enumerate.nextElement(); if (session.equals(conn.getSession())) { conn.setBusy(false); } } } /** * 刷新连接池 * * @throws JSchException 远程连接异常 */ public synchronized void refreshConnections() throws JSchException { // 确保连接池己创新存在 if (Objects.isNull(connections)) { log.error("ConnectionPool does not exist"); return; } PooledConnection conn; Enumeration<PooledConnection> enumerate = connections.elements(); while (enumerate.hasMoreElements()) { conn = enumerate.nextElement(); if (conn.isBusy()) { wait(5000); } closeConnection(conn.getSession()); conn.setSession(newConnection()); conn.setBusy(false); } } /** * 关闭连接池 */ public synchronized void closeConnectionPool() { // 确保连接池存在,假如不存在,返回 if (Objects.isNull(connections)) { log.info("Connection pool does not exist"); return; } PooledConnection conn; Enumeration<PooledConnection> enumerate = connections.elements(); while (enumerate.hasMoreElements()) { conn = enumerate.nextElement(); if (conn.isBusy()) { wait(5000); } closeConnection(conn.getSession()); connections.removeElement(conn); } connections = null; } /** * 关闭session会话 * * @param session 需要关闭的session */ private void closeConnection(Session session) { session.disconnect(); } /** * 线程暂停 * * @param mSeconds 暂停时间 */ private void wait(int mSeconds) { try { Thread.sleep(mSeconds); } catch (InterruptedException e) { log.error("{} 线程暂停失败 -> {}", Thread.currentThread().getName(), e.getMessage()); } } /** * 对象连接状态 */ class PooledConnection { /** * 远程连接 */ Session session; /** * 此连接是否正在使用的标志,默认没有正在使用 */ boolean busy = false; /** * 连接标记 */ String tag; /** * 构造函数,根据一个 Session 构造一个 PooledConnection 对象 * * @param session 连接 * @param tag 连接标记 */ public PooledConnection(Session session, String tag) { this.session = session; this.tag = tag; } public Session getSession() { return session; } public void setSession(Session session) { this.session = session; } public boolean isBusy() { return busy; } public void setBusy(boolean busy) { this.busy = busy; } public String getTag() { return tag; } public void setTag(String tag) { this.tag = tag; } } public int getIncrementalConnections() { return this.incrementalConnections; } public void setIncrementalConnections(int incrementalConnections) { this.incrementalConnections = incrementalConnections; } public int getMaxConnections() { return this.maxConnections; } public void setMaxConnections(int maxConnections) { this.maxConnections = maxConnections; } }
4. 改造shellUtil
ShellUtil.java
@Slf4j @Component @Scope(value = "prototype") public class ShellUtil { /** * ip地址 */ private String ip = ""; /** * 端口号 */ private Integer port = 22; /** * 用户名 */ private String username = ""; /** * 密码 */ private String password = ""; private Session session; private Channel channel; private ChannelExec channelExec; private ChannelSftp channelSftp; private ChannelShell channelShell; private ConnectionPool connectionPool; public ShellUtil(ConnectionPool connectionPool) { this.connectionPool = connectionPool; } /** * 初始化 * * @param ip 远程主机IP地址 * @param port 远程主机端口 * @param username 远程主机登陆用户名 * @param password 远程主机登陆密码 * @throws JSchException JSch异常 */ public void init(String ip, Integer port, String username, String password) throws JSchException { this.ip = ip; this.port = port; this.username = username; this.password = password; } public void init(String ip, String username, String password) throws JSchException { this.ip = ip; this.username = username; this.password = password; } private void getSession() throws JSchException { session = connectionPool.getConnection(ip, port, username, password); if (Objects.isNull(session)) { connectionPool.refreshConnections(); session = connectionPool.getConnection(ip, port, username, password); if (Objects.isNull(session)){ throw new RuntimeException("无可用连接"); } } } /** * 连接多次执行命令,执行命令完毕后需要执行close()方法 * * @param command 需要执行的指令 * @return 执行结果 * @throws Exception 没有执行初始化 */ public String execCmd(String command) throws Exception { initChannelExec(); log.info("execCmd command - > {}", command); channelExec.setCommand(command); channel.setInputStream(null); channelExec.setErrStream(System.err); channel.connect(); StringBuilder sb = new StringBuilder(16); try (InputStream in = channelExec.getInputStream(); InputStreamReader isr = new InputStreamReader(in, StandardCharsets.UTF_8); BufferedReader reader = new BufferedReader(isr)) { String buffer; while ((buffer = reader.readLine()) != null) { sb.append("\n").append(buffer); } log.info("execCmd result - > {}", sb); return sb.toString(); } } /** * 执行命令关闭连接 * * @param command 需要执行的指令 * @return 执行结果 * @throws Exception 没有执行初始化 */ public String execCmdAndClose(String command) throws Exception { String result = execCmd(command); close(); return result; } /** * 执行复杂shell命令 * * @param cmds 多条命令 * @return 执行结果 * @throws Exception 连接异常 */ public String execCmdByShell(String... cmds) throws Exception { return execCmdByShell(Arrays.asList(cmds)); } /** * 执行复杂shell命令 * * @param cmds 多条命令 * @return 执行结果 * @throws Exception 连接异常 */ public String execCmdByShell(List<String> cmds) throws Exception { String result = ""; initChannelShell(); InputStream inputStream = channelShell.getInputStream(); channelShell.setPty(true); channelShell.connect(); OutputStream outputStream = channelShell.getOutputStream(); PrintWriter printWriter = new PrintWriter(outputStream); for (String cmd : cmds) { printWriter.println(cmd); } printWriter.flush(); byte[] tmp = new byte[1024]; while (true) { while (inputStream.available() > 0) { int i = inputStream.read(tmp, 0, 1024); if (i < 0) { break; } String s = new String(tmp, 0, i); if (s.contains("--More--")) { outputStream.write((" ").getBytes()); outputStream.flush(); } System.out.println(s); } if (channelShell.isClosed()) { System.out.println("exit-status:" + channelShell.getExitStatus()); break; } try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } } outputStream.close(); inputStream.close(); close(); return result; } /** * SFTP文件上传 * * @param src 源地址 * @param dst 目的地址 * @throws Exception 上传文件失败 */ public void put(String src, String dst) throws Exception { put(src, dst, ChannelSftp.OVERWRITE); } /** * SFTP文件上传 * * @param src 源地址 * @param dst 目的地址 * @param mode 上传模式 默认为ChannelSftp.OVERWRITE * @throws Exception 上传文件失败 */ public void put(String src, String dst, int mode) throws Exception { initChannelSftp(); log.info("Upload File {} -> {}", src, dst); channelSftp.put(src, dst, mode); log.info("Upload File Success!"); close(); } /** * SFTP文件上传并监控上传进度 * * @param src 源地址 * @param dst 目的地址 * @throws Exception 上传文件失败 */ public void putMonitorAndClose(String src, String dst) throws Exception { putMonitorAndClose(src, dst, ChannelSftp.OVERWRITE); } /** * SFTP文件上传并监控上传进度 * * @param src 源地址 * @param dst 目的地址 * @param mode 上传模式 默认为ChannelSftp.OVERWRITE * @throws Exception 上传文件失败 */ public void putMonitorAndClose(String src, String dst, int mode) throws Exception { initChannelSftp(); FileProgressMonitor monitor = new FileProgressMonitor(new File(src).length()); log.info("Upload File {} -> {}", src, dst); channelSftp.put(src, dst, monitor, mode); log.info("Upload File Success!"); close(); } /** * SFTP文件下载 * * @param src 源文件地址 * @param dst 目的地址 * @throws Exception 下载文件失败 */ public void get(String src, String dst) throws Exception { initChannelSftp(); log.info("Download File {} -> {}", src, dst); channelSftp.get(src, dst); log.info("Download File Success!"); close(); } /** * SFTP文件下载并监控下载进度 * * @param src 源文件地址 * @param dst 目的地址 * @throws Exception 下载文件失败 */ public void getMonitorAndClose(String src, String dst) throws Exception { initChannelSftp(); FileProgressMonitor monitor = new FileProgressMonitor(new File(src).length()); log.info("Download File {} -> {}", src, dst); channelSftp.get(src, dst, monitor); log.info("Download File Success!"); close(); } /** * 删除指定目录文件 * * @param path 删除路径 * @throws Exception 远程主机连接异常 */ public void deleteFile(String path) throws Exception { initChannelSftp(); channelSftp.rm(path); log.info("Delete File {}", path); close(); } /** * 删除指定目录 * * @param path 删除路径 * @throws Exception 远程主机连接异常 */ public void deleteDir(String path) throws Exception { initChannelSftp(); channelSftp.rmdir(path); log.info("Delete Dir {} ", path); close(); } /** * 释放资源 */ public void close() { connectionPool.returnConnection(session); } private void initChannelSftp() throws Exception { getSession(); channel = session.openChannel("sftp"); channel.connect(); // 建立SFTP通道的连接 channelSftp = (ChannelSftp) channel; if (session == null || channel == null || channelSftp == null) { log.error("请先执行init()"); throw new Exception("请先执行init()"); } } private void initChannelExec() throws Exception { getSession(); // 打开执行shell指令的通道 channel = session.openChannel("exec"); channelExec = (ChannelExec) channel; if (session == null || channel == null || channelExec == null) { log.error("请先执行init()"); throw new Exception("请先执行init()"); } } private void initChannelShell() throws Exception { getSession(); // 打开执行shell指令的通道 channel = session.openChannel("shell"); channelShell = (ChannelShell) channel; if (session == null || channel == null || channelShell == null) { log.error("请先执行init()"); throw new Exception("请先执行init()"); } } }
5. 添加配置
ConnectionPoolConfig.java
@Configuration public class PoolConfiguration { @Value("${ssh.strictHostKeyChecking:no}") private String strictHostKeyChecking; @Value("${ssh.timeout:30000}") private Integer timeout; @Value("${ssh.incrementalConnections:2}") private Integer incrementalConnections; @Value("${ssh.maxConnections:10}") private Integer maxConnections; @Bean public ConnectionPool connectionPool(){ return new ConnectionPool(strictHostKeyChecking, timeout,incrementalConnections,maxConnections); } }
6. 线程安全问题解决
6.1
public class SessionThreadLocal { private static ThreadLocal<Session> threadLocal = new ThreadLocal<>(); public static synchronized void set(Session session) { threadLocal.set(session); } public static synchronized Session get( ) { return threadLocal.get(); } public static synchronized void remove( ) { threadLocal.remove(); } }
6.2 使用springboot中bean的作用域prototype
使用@Lookup注入方式
@Lookup public ShellUtil getshellUtil(){ return null; }; @GetMapping("/test") public void Test() throws Exception { int i = getshellUtil().hashCode(); System.out.println(i); }
以上为个人经验,希望能给大家一个参考,也希望大家多多支持自由互联。