正文
我们上篇文章讲到了查询方法里面的doQuery方法,这里面就是调用JDBC的API了,其中的逻辑比较复杂,我们这边文章来讲,先看看我们上篇文章分析的地方。
SimpleExecutor
public abstract class BaseExecutor implements Executor { protected abstract <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException; } public class SimpleExecutor extends BaseExecutor { @Override public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException { Statement stmt = null; try { Configuration configuration = ms.getConfiguration(); // 创建 StatementHandler StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql); // 创建 Statement stmt = prepareStatement(handler, ms.getStatementLog()); // 执行查询操作 return handler.<E>query(stmt, resultHandler); } finally { // 关闭 Statement closeStatement(stmt); } } }上篇文章我们分析完了StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql),在代码中创建了一个PreparedStatementHandler,我们要接着stmt = prepareStatement(handler, ms.getStatementLog())开始分析,也就是创建 Statement,先不忙着分析,我们先来回顾一下 ,我们以前是怎么使用jdbc的。
jdbc
public class JDBCDemo { /** * 第一步,加载驱动,创建数据库的连接 * 第二步,编写sql * 第三步,需要对sql进行预编译 * 第四步,向sql里面设置参数 * 第五步,执行sql * 第六步,释放资源 * @throws Exception */ public static final String URL = "jdbc:mysql://localhost:3306/demo"; public static final String USER = "root"; public static final String PASSWORD = "123456"; public static void main(String[] args) throws Exception { jdbcDemo("lucy","123"); } public static void jdbcDemo(String username , String password) throws Exception{ Connection conn = null; PreparedStatement psmt = null; ResultSet rs = null; try { //加载驱动程序 Class.forName("com.mysql.jdbc.Driver"); //获得数据库连接 conn = DriverManager.getConnection(URL, USER, PASSWORD); //编写sql String sql = "select * from user where name =? and password = ?";//问号相当于一个占位符 //对sql进行预编译 psmt = conn.prepareStatement(sql); //设置参数 psmt.setString(1, username); psmt.setString(2, password); //执行sql ,返回一个结果集 rs = psmt.executeQuery(); //输出结果 while(rs.next()){ System.out.println(rs.getString("user_name")+" 年龄:"+rs.getInt("age")); } } catch (Exception e) { e.printStackTrace(); }finally{ //释放资源 conn.close(); psmt.close(); rs.close(); } } }上面代码中注释已经很清楚了,我们来看看mybatis中是怎么和数据库打交道的。
SimpleExecutor
public class SimpleExecutor extends BaseExecutor { private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException { Statement stmt; // 获取数据库连接 Connection connection = getConnection(statementLog); // 创建 Statement stmt = handler.prepare(connection, transaction.getTimeout()); // 为 Statement 设置参数 handler.parameterize(stmt); return stmt; } }在上面的代码中我们终于看到了和jdbc相关的内容了,大概分为下面三个步骤:
- 1、获取数据库连接。
- 2、创建PreparedStatement。
- 3、为PreparedStatement设置运行时参数。
我们先来看看获取数据库连接,跟进代码看看
BaseExecutor
public abstract class BaseExecutor implements Executor { protected Transaction transaction; protected Connection getConnection(Log statementLog) throws SQLException { //通过transaction来获取Connection Connection connection = transaction.getConnection(); if (statementLog.isDebugEnabled()) { return ConnectionLogger.newInstance(connection, statementLog, queryStack); } else { return connection; } } }我们看到是通过Executor中的transaction属性来获取Connection,那我们就先来看看transaction,根据前面的文章中的配置 <transactionManager type="jdbc"/>,则MyBatis会创建一个JdbcTransactionFactory.class 实例,Executor中的transaction是一个JdbcTransaction.class 实例,其实现Transaction接口,那我们先来看看Transaction
JdbcTransaction
我们先来看看其接口Transaction
Transaction
public interface Transaction { //获取数据库连接 Connection getConnection() throws SQLException; //提交事务 void commit() throws SQLException; //回滚事务 void rollback() throws SQLException; //关闭事务 void close() throws SQLException; //获取超时时间 Integer getTimeout() throws SQLException; }接着我们看看其实现类JdbcTransaction
JdbcTransaction
public class JdbcTransaction implements Transaction { private static final Log log = LogFactory.getLog(JdbcTransaction.class); //数据库连接 protected Connection connection; //数据源信息 protected DataSource dataSource; //隔离级别 protected TransactionIsolationLevel level; //是否为自动提交 protected boolean autoCommmit; public JdbcTransaction(DataSource ds, TransactionIsolationLevel desiredLevel, boolean desiredAutoCommit) { dataSource = ds; level = desiredLevel; autoCommmit = desiredAutoCommit; } public JdbcTransaction(Connection connection) { this.connection = connection; } @Override public Connection getConnection() throws SQLException { //如果事务中不存在connection,则获取一个connection并放入connection属性中 //第一次肯定为空 if (connection == null) { openConnection(); } //如果事务中已经存在connection,则直接返回这个connection return connection; } /** * commit()功能 * @throws SQLException */ @Override public void commit() throws SQLException { if (connection != null && !connection.getAutoCommit()) { if (log.isDebugEnabled()) { log.debug("Committing JDBC Connection [" + connection + "]"); } //使用connection的commit() connection.commit(); } } /** * rollback()功能 * @throws SQLException */ @Override public void rollback() throws SQLException { if (connection != null && !connection.getAutoCommit()) { if (log.isDebugEnabled()) { log.debug("Rolling back JDBC Connection [" + connection + "]"); } //使用connection的rollback() connection.rollback(); } } /** * close()功能 * @throws SQLException */ @Override public void close() throws SQLException { if (connection != null) { resetAutoCommit(); if (log.isDebugEnabled()) { log.debug("Closing JDBC Connection [" + connection + "]"); } //使用connection的close() connection.close(); } } protected void setDesiredAutoCommit(boolean desiredAutoCommit) { try { if (connection.getAutoCommit() != desiredAutoCommit) { if (log.isDebugEnabled()) { log.debug("Setting autocommit to " + desiredAutoCommit + " on JDBC Connection [" + connection + "]"); } connection.setAutoCommit(desiredAutoCommit); } } catch (SQLException e) { // Only a very poorly implemented driver would fail here, // and there's not much we can do about that. throw new TransactionException("Error configuring AutoCommit. " + "Your driver may not support getAutoCommit() or setAutoCommit(). " + "Requested setting: " + desiredAutoCommit + ". Cause: " + e, e); } } protected void resetAutoCommit() { try { if (!connection.getAutoCommit()) { // MyBatis does not call commit/rollback on a connection if just selects were performed. // Some databases start transactions with select statements // and they mandate a commit/rollback before closing the connection. // A workaround is setting the autocommit to true before closing the connection. // Sybase throws an exception here. if (log.isDebugEnabled()) { log.debug("Resetting autocommit to true on JDBC Connection [" + connection + "]"); } //通过connection设置事务是否自动提交 connection.setAutoCommit(true); } } catch (SQLException e) { if (log.isDebugEnabled()) { log.debug("Error resetting autocommit to true " + "before closing the connection. Cause: " + e); } } } protected void openConnection() throws SQLException { if (log.isDebugEnabled()) { log.debug("Opening JDBC Connection"); } //通过dataSource来获取connection,并设置到transaction的connection属性中 connection = dataSource.getConnection(); if (level != null) { //通过connection设置事务的隔离级别 connection.setTransactionIsolation(level.getLevel()); } //设置事务是否自动提交 setDesiredAutoCommit(autoCommmit); } @Override public Integer getTimeout() throws SQLException { return null; } }我们看到JdbcTransaction中有一个Connection属性和dataSource属性,使用connection来进行提交、回滚、关闭等操作,也就是说JdbcTransaction其实只是在jdbc的connection上面封装了一下,实际使用的其实还是jdbc的事务。我们看看getConnection()方法
public class JdbcTransaction implements Transaction { //数据库连接 protected Connection connection; //数据源信息 protected DataSource dataSource; @Override public Connection getConnection() throws SQLException { //如果事务中不存在connection,则获取一个connection并放入connection属性中 //第一次肯定为空 if (connection == null) { openConnection(); } //如果事务中已经存在connection,则直接返回这个connection return connection; } protected void openConnection() throws SQLException { if (log.isDebugEnabled()) { log.debug("Opening JDBC Connection"); } //通过dataSource来获取connection,并设置到transaction的connection属性中 connection = dataSource.getConnection(); if (level != null) { //通过connection设置事务的隔离级别 connection.setTransactionIsolation(level.getLevel()); } //设置事务是否自动提交 setDesiredAutoCommit(autoCommmit); } }先是判断当前事务中是否存在connection,如果存在,则直接返回connection,如果不存在则通过dataSource来获取connection,这里我们明白了一点,如果当前事务没有关闭,也就是没有释放connection,那么在同一个Transaction中使用的是同一个connection,我们再来想想,transaction是SimpleExecutor中的属性,SimpleExecutor又是SqlSession中的属性,那我们可以这样说,同一个SqlSession中只有一个SimpleExecutor,SimpleExecutor中有一个Transaction,Transaction有一个connection。我们来看看如下例子
public static void main(String[] args) throws IOException { String resource = "mybatis-config.xml"; InputStream inputStream = Resources.getResourceAsStream(resource); SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream); //创建一个SqlSession SqlSession sqlSession = sqlSessionFactory.openSession(); try { EmployeeMapper employeeMapper = sqlSession.getMapper(Employee.class); UserMapper userMapper = sqlSession.getMapper(User.class); List<Employee> allEmployee = employeeMapper.getAll(); List<User> allUser = userMapper.getAll(); Employee employee = employeeMapper.getOne(); } finally { sqlSession.close(); } }我们看到同一个sqlSession可以获取多个Mapper代理对象,则多个Mapper代理对象中的sqlSession引用应该是同一个,那么多个Mapper代理对象调用方法应该是同一个Connection,直到调用close(),所以说我们的sqlSession是线程不安全的,如果所有的业务都使用一个sqlSession,那Connection也是同一个,一个业务执行完了就将其关闭,那其他的业务还没执行完呢。大家明白了吗?我们回归到源码,connection = dataSource.getConnection();,最终还是调用dataSource来获取连接,那我们是不是要来看看dataSource呢?
我们还是从前面的配置文件来看<dataSource type="UNPOOLED|POOLED">,这里有UNPOOLED和POOLED两种DataSource,一种是使用连接池,一种是普通的DataSource,UNPOOLED将会创将new UnpooledDataSource()实例,POOLED将会new pooledDataSource()实例,都实现DataSource接口,那我们先来看看DataSource接口
DataSource
public interface DataSource extends CommonDataSource, Wrapper { //获取数据库连接 Connection getConnection() throws SQLException; //获取数据库连接 Connection getConnection(String username, String password) throws SQLException; }很简单,只有一个获取数据库连接的接口,那我们来看看其实现类
UnpooledDataSource
UnpooledDataSource,从名称上即可知道,该种数据源不具有池化特性。该种数据源每次会返回一个新的数据库连接,而非复用旧的连接。其核心的方法有三个,分别如下:
- initializeDriver - 初始化数据库驱动
- doGetConnection - 获取数据连接
- configureConnection - 配置数据库连接
初始化数据库驱动
看下我们上面使用JDBC的例子,在执行 SQL 之前,通常都是先获取数据库连接。一般步骤都是加载数据库驱动,然后通过 DriverManager 获取数据库连接。UnpooledDataSource 也是使用 JDBC 访问数据库的,因此它获取数据库连接的过程一样。
UnpooledDataSource
public class UnpooledDataSource implements DataSource { private ClassLoader driverClassLoader; private Properties driverProperties; private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<String, Driver>(); private String driver; private String url; private String username; private String password; private Boolean autoCommit; private Integer defaultTransactionIsolationLevel; static { Enumeration<Driver> drivers = DriverManager.getDrivers(); while (drivers.hasMoreElements()) { Driver driver = drivers.nextElement(); registeredDrivers.put(driver.getClass().getName(), driver); } } public UnpooledDataSource() { } public UnpooledDataSource(String driver, String url, String username, String password) { this.driver = driver; this.url = url; this.username = username; this.password = password; } private synchronized void initializeDriver() throws SQLException { // 检测当前 driver 对应的驱动实例是否已经注册 if (!registeredDrivers.containsKey(driver)) { Class<?> driverType; try { // 加载驱动类型 if (driverClassLoader != null) { // 使用 driverClassLoader 加载驱动 driverType = Class.forName(driver, true, driverClassLoader); } else { // 通过其他 ClassLoader 加载驱动 driverType = Resources.classForName(driver); } // DriverManager requires the driver to be loaded via the system ClassLoader. // http://www.kfu.com/~nsayer/Java/dyn-jdbc.html // 通过反射创建驱动实例 Driver driverInstance = (Driver)driverType.newInstance(); /* * 注册驱动,注意这里是将 Driver 代理类 DriverProxy 对象注册到 DriverManager 中的,而非 Driver 对象本身。 */ DriverManager.registerDriver(new DriverProxy(driverInstance)); // 缓存驱动类名和实例,防止多次注册 registeredDrivers.put(driver, driverInstance); } catch (Exception e) { throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + e); } } } //略...... } public class DriverManager { private final static CopyOnWriteArrayList<DriverInfo> registeredDrivers = new CopyOnWriteArrayList<>(); public static synchronized void registerDriver(java.sql.Driver driver) throws SQLException { registerDriver(driver, null); } public static synchronized void registerDriver(java.sql.Driver driver, DriverAction da) throws SQLException { /* Register the driver if it has not already been added to our list */ if(driver != null) { registeredDrivers.addIfAbsent(new DriverInfo(driver, da)); } else { // This is for compatibility with the original DriverManager throw new NullPointerException(); } println("registerDriver: " + driver); } }通过反射机制加载驱动Driver,并将其注册到DriverManager中的一个常量集合中,供后面获取连接时使用,为什么这里是一个List呢?我们实际开发中有可能使用到了多种数据库类型,如Mysql、Oracle等,其驱动都是不同的,不同的数据源获取连接时使用的是不同的驱动。
在我们使用JDBC的时候,也没有通过DriverManager.registerDriver(new DriverProxy(driverInstance));去注册Driver啊,如果我们使用的是Mysql数据源,那我们来看Class.forName("com.mysql.jdbc.Driver");这句代码发生了什么 Class.forName主要是做了什么呢?它主要是要求JVM查找并装载指定的类。这样我们的类com.mysql.jdbc.Driver就被装载进来了。而且在类被装载进JVM的时候,它的静态方法就会被执行。我们来看com.mysql.jdbc.Driver的实现代码。在它的实现里有这么一段代码:
static { try { java.sql.DriverManager.registerDriver(new Driver()); } catch (SQLException E) { throw new RuntimeException("Can't register driver!"); } }很明显,这里使用了DriverManager并将该类给注册上去了。所以,对于任何实现前面Driver接口的类,只要在他们被装载进JVM的时候注册DriverManager就可以实现被后续程序使用。
作为那些被加载的Driver实现,他们本身在被装载时会在执行的static代码段里通过调用DriverManager.registerDriver()来把自身注册到DriverManager的registeredDrivers列表中。这样后面就可以通过得到的Driver来取得连接了。
获取数据库连接
在上面例子中使用 JDBC 时,我们都是通过 DriverManager 的接口方法获取数据库连接。我们来看看UnpooledDataSource是如何获取的。
UnpooledDataSource
public class UnpooledDataSource implements DataSource { @Override public Connection getConnection() throws SQLException { return doGetConnection(username, password); } @Override public Connection getConnection(String username, String password) throws SQLException { return doGetConnection(username, password); } private Connection doGetConnection(String username, String password) throws SQLException { Properties props = new Properties(); if (driverProperties != null) { props.putAll(driverProperties); } if (username != null) { // 存储 user 配置 props.setProperty("user", username); } if (password != null) { // 存储 password 配置 props.setProperty("password", password); } // 调用重载方法 return doGetConnection(props); } private Connection doGetConnection(Properties properties) throws SQLException { // 初始化驱动,我们上一节已经讲过了,只用初始化一次 initializeDriver(); // 获取连接 Connection connection = DriverManager.getConnection(url, properties); // 配置连接,包括自动提交以及事务等级 configureConnection(connection); return connection; } private void configureConnection(Connection conn) throws SQLException { if (autoCommit != null && autoCommit != conn.getAutoCommit()) { // 设置自动提交 conn.setAutoCommit(autoCommit); } if (defaultTransactionIsolationLevel != null) { // 设置事务隔离级别 conn.setTransactionIsolation(defaultTransactionIsolationLevel); } } }上面方法将一些配置信息放入到 Properties 对象中,然后将数据库连接和 Properties 对象传给 DriverManager 的 getConnection 方法即可获取到数据库连接。我们来看看是怎么获取数据库连接的
public class DriverManager { private final static CopyOnWriteArrayList<DriverInfo> registeredDrivers = new CopyOnWriteArrayList<>(); @CallerSensitive public static Connection getConnection(String url, java.util.Properties info) throws SQLException { return (getConnection(url, info, Reflection.getCallerClass())); } private static Connection getConnection( String url, java.util.Properties info, Class<?> caller) throws SQLException { /* * When callerCl is null, we should check the application's * (which is invoking this class indirectly) * classloader, so that the JDBC driver class outside rt.jar * can be loaded from here. */ // 获取类加载器 ClassLoader callerCL = caller != null ? caller.getClassLoader() : null; synchronized(DriverManager.class) { // synchronize loading of the correct classloader. if (callerCL == null) { callerCL = Thread.currentThread().getContextClassLoader(); } } if(url == null) { throw new SQLException("The url cannot be null", "08001"); } println("DriverManager.getConnection(\"" + url + "\")"); // Walk through the loaded registeredDrivers attempting to make a connection. // Remember the first exception that gets raised so we can reraise it. SQLException reason = null; // 此处省略部分代码 // 这里遍历的是在registerDriver(Driver driver)方法中注册的驱动对象 // 每个DriverInfo包含了驱动对象和其信息 for(DriverInfo aDriver : registeredDrivers) { // If the caller does not have permission to load the driver then // skip it. // 判断是否为当前线程类加载器加载的驱动类 if(isDriverAllowed(aDriver.driver, callerCL)) { try { println(" trying " + aDriver.driver.getClass().getName()); // 获取连接对象,这里调用了Driver的父类的方法 // 如果这里有多个DriverInfo,比喻Mysql和Oracle的Driver都注册registeredDrivers了 // 这里所有的Driver都会尝试使用url和info去连接,哪个连接上了就返回 // 会不会所有的都会连接上呢?不会,因为url的写法不同,不同的Driver会判断url是否适合当前驱动 Connection con = aDriver.driver.connect(url, info); if (con != null) { // Success! // 打印连接成功信息 println("getConnection returning " + aDriver.driver.getClass().getName()); // 返回连接对像 return (con); } } catch (SQLException ex) { if (reason == null) { reason = ex; } } } else { println(" skipping: " + aDriver.getClass().getName()); } } // if we got here nobody could connect. if (reason != null) { println("getConnection failed: " + reason); throw reason; } println("getConnection: no suitable driver found for "+ url); throw new SQLException("No suitable driver found for "+ url, "08001"); } }代码中循环所有注册的驱动,然后通过驱动进行连接,所有的驱动都会尝试连接,但是不同的驱动,连接的URL是不同的,Mysql的url是jdbc:mysql://localhost:3306/demo,以jdbc:mysql://开头,则其Mysql的驱动肯定会判断获取连接的url符合,Oracle的也类似。
由于篇幅原因,我这里就不分析了,大家有兴趣的可以看看,最后由URL对应的驱动获取到Connection返回,好了我们再来看看下一种DataSource
PooledDataSource
PooledDataSource 内部实现了连接池功能,用于复用数据库连接。因此,从效率上来说,PooledDataSource 要高于 UnpooledDataSource。但是最终获取Connection还是通过UnpooledDataSource,只不过PooledDataSource 提供一个存储Connection的功能。
辅助类介绍
PooledDataSource 需要借助两个辅助类帮其完成功能,这两个辅助类分别是 PoolState 和 PooledConnection。PoolState 用于记录连接池运行时的状态,比如连接获取次数,无效连接数量等。同时 PoolState 内部定义了两个 PooledConnection 集合,用于存储空闲连接和活跃连接。PooledConnection 内部定义了一个 Connection 类型的变量,用于指向真实的数据库连接。以及一个 Connection 的代理类,用于对部分方法调用进行拦截。至于为什么要拦截,随后将进行分析。除此之外,PooledConnection 内部也定义了一些字段,用于记录数据库连接的一些运行时状态。接下来,我们来看一下 PooledConnection 的定义。
class PooledConnection implements InvocationHandler { private static final String CLOSE = "close"; private static final Class<?>[] IFACES = new Class<?>[] { Connection.class }; private final int hashCode; private final PooledDataSource dataSource; // 真实的数据库连接 private final Connection realConnection; // 数据库连接代理 private final Connection proxyConnection; // 从连接池中取出连接时的时间戳 private long checkoutTimestamp; // 数据库连接创建时间 private long createdTimestamp; // 数据库连接最后使用时间 private long lastUsedTimestamp; // connectionTypeCode = (url + username + password).hashCode() private int connectionTypeCode; // 表示连接是否有效 private boolean valid; public PooledConnection(Connection connection, PooledDataSource dataSource) { this.hashCode = connection.hashCode(); this.realConnection = connection; this.dataSource = dataSource; this.createdTimestamp = System.currentTimeMillis(); this.lastUsedTimestamp = System.currentTimeMillis(); this.valid = true; // 创建 Connection 的代理类对象 this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this); } // 省略部分代码...... }下面再来看看 PoolState 的定义。
PoolState
public class PoolState { protected PooledDataSource dataSource; // 空闲连接列表 protected final List<PooledConnection> idleConnections = new ArrayList<PooledConnection>(); // 活跃连接列表 protected final List<PooledConnection> activeConnections = new ArrayList<PooledConnection>(); // 从连接池中获取连接的次数 protected long requestCount = 0; // 请求连接总耗时(单位:毫秒) protected long accumulatedRequestTime = 0; // 连接执行时间总耗时 protected long accumulatedCheckoutTime = 0; // 执行时间超时的连接数 protected long claimedOverdueConnectionCount = 0; // 超时时间累加值 protected long accumulatedCheckoutTimeOfOverdueConnections = 0; // 等待时间累加值 protected long accumulatedWaitTime = 0; // 等待次数 protected long hadToWaitCount = 0; // 无效连接数 protected long badConnectionCount = 0; // 省略部分代码...... }大家记住上面的空闲连接列表和活跃连接列表
获取连接
前面已经说过,PooledDataSource 会将用过的连接进行回收,以便可以复用连接。因此从 PooledDataSource 获取连接时,如果空闲链接列表里有连接时,可直接取用。那如果没有空闲连接怎么办呢?此时有两种解决办法,要么创建新连接,要么等待其他连接完成任务。
PooledDataSource
public class PooledDataSource implements DataSource { private static final Log log = LogFactory.getLog(PooledDataSource.class); private final PoolState state = new PoolState(this); private final UnpooledDataSource dataSource; // OPTIONAL CONFIGURATION FIELDS protected int poolMaximumActiveConnections = 10; protected int poolMaximumIdleConnections = 5; protected int poolMaximumCheckoutTime = 20000; protected int poolTimeToWait = 20000; protected int poolMaximumLocalBadConnectionTolerance = 3; protected String poolPingQuery = "NO PING QUERY SET"; protected boolean poolPingEnabled; protected int poolPingConnectionsNotUsedFor; private int expectedConnectionTypeCode; public PooledDataSource() { //构造器中创建UnpooledDataSource对象 dataSource = new UnpooledDataSource(); } public PooledDataSource(UnpooledDataSource dataSource) { this.dataSource = dataSource; } @Override public Connection getConnection() throws SQLException { return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection(); } private PooledConnection popConnection(String username, String password) throws SQLException { boolean countedWait = false; PooledConnection conn = null; long t = System.currentTimeMillis(); int localBadConnectionCount = 0; while (conn == null) { synchronized (state) { // 检测空闲连接集合(idleConnections)是否为空 if (!state.idleConnections.isEmpty()) { // Pool has available connection // idleConnections 不为空,表示有空闲连接可以使用,直接从空闲连接集合中取出一个连接 conn = state.idleConnections.remove(0); if (log.isDebugEnabled()) { log.debug("Checked out connection " + conn.getRealHashCode() + " from pool."); } } else { // Pool does not have available connection /* * 暂无空闲连接可用,但如果活跃连接数还未超出限制 *(poolMaximumActiveConnections),则可创建新的连接 */ if (state.activeConnections.size() < poolMaximumActiveConnections) { // Can create new connection // 创建新连接,看到没,还是通过dataSource获取连接,也就是UnpooledDataSource获取连接 conn = new PooledConnection(dataSource.getConnection(), this); if (log.isDebugEnabled()) { log.debug("Created connection " + conn.getRealHashCode() + "."); } } else { // Cannot create new connection // 连接池已满,不能创建新连接 // 取出运行时间最长的连接 PooledConnection oldestActiveConnection = state.activeConnections.get(0); // 获取运行时长 long longestCheckoutTime = oldestActiveConnection.getCheckoutTime(); // 检测运行时长是否超出限制,即超时 if (longestCheckoutTime > poolMaximumCheckoutTime) { // Can claim overdue connection // 累加超时相关的统计字段 state.claimedOverdueConnectionCount++; state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime; state.accumulatedCheckoutTime += longestCheckoutTime; // 从活跃连接集合中移除超时连接 state.activeConnections.remove(oldestActiveConnection); // 若连接未设置自动提交,此处进行回滚操作 if (!oldestActiveConnection.getRealConnection().getAutoCommit()) { try { oldestActiveConnection.getRealConnection().rollback(); } catch (SQLException e) { log.debug("Bad connection. Could not roll back"); } } /* * 创建一个新的 PooledConnection,注意, * 此处复用 oldestActiveConnection 的 realConnection 变量 */ conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this); /* * 复用 oldestActiveConnection 的一些信息,注意 PooledConnection 中的 * createdTimestamp 用于记录 Connection 的创建时间,而非 PooledConnection * 的创建时间。所以这里要复用原连接的时间信息。 */ conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp()); conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp()); // 设置连接为无效状态 oldestActiveConnection.invalidate(); if (log.isDebugEnabled()) { log.debug("Claimed overdue connection " + conn.getRealHashCode() + "."); } } else { // Must wait // 运行时间最长的连接并未超时 try { if (!countedWait) { state.hadToWaitCount++; countedWait = true; } if (log.isDebugEnabled()) { log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection."); } long wt = System.currentTimeMillis(); // 当前线程进入等待状态 state.wait(poolTimeToWait); state.accumulatedWaitTime += System.currentTimeMillis() - wt; } catch (InterruptedException e) { break; } } } } if (conn != null) { // ping to server and check the connection is valid or not if (conn.isValid()) { if (!conn.getRealConnection().getAutoCommit()) { // 进行回滚操作 conn.getRealConnection().rollback(); } // 设置统计字段 conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password)); conn.setCheckoutTimestamp(System.currentTimeMillis()); conn.setLastUsedTimestamp(System.currentTimeMillis()); state.activeConnections.add(conn); state.requestCount++; state.accumulatedRequestTime += System.currentTimeMillis() - t; } else { if (log.isDebugEnabled()) { log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection."); } // 连接无效,此时累加无效连接相关的统计字段 state.badConnectionCount++; localBadConnectionCount++; conn = null; if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) { if (log.isDebugEnabled()) { log.debug("PooledDataSource: Could not get a good connection to the database."); } throw new SQLException("PooledDataSource: Could not get a good connection to the database."); } } } } } if (conn == null) { if (log.isDebugEnabled()) { log.debug("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection."); } throw new SQLException("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection."); } return conn; } }从连接池中获取连接首先会遇到两种情况:
- 1、连接池中有空闲连接。
- 2、连接池中无空闲连接。
对于第一种情况,把连接取出返回即可。对于第二种情况,则要进行细分,会有如下的情况。
- 1、活跃连接数没有超出最大活跃连接数。
- 2、活跃连接数超出最大活跃连接数。
对于上面两种情况,第一种情况比较好处理,直接创建新的连接即可。至于第二种情况,需要再次进行细分。
- 1、活跃连接的运行时间超出限制,即超时了
- 2、活跃连接未超时
对于第一种情况,我们直接将超时连接强行中断,并进行回滚,然后复用部分字段重新创建 PooledConnection 即可。对于第二种情况,目前没有更好的处理方式了,只能等待了。
回收连接
相比于获取连接,回收连接的逻辑要简单的多。回收连接成功与否只取决于空闲连接集合的状态,所需处理情况很少,因此比较简单。
我们还是来看看
public class PooledDataSource implements DataSource { @Override public Connection getConnection() throws SQLException { return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection(); } }返回的是PooledConnection的一个代理类,为什么不直接使用PooledConnection的realConnection呢?我们可以看下PooledConnection这个类
class PooledConnection implements InvocationHandler很熟悉是吧,标准的代理类用法,看下其invoke方法
PooledConnection
class PooledConnection implements InvocationHandler { private final PooledDataSource dataSource; private final Connection realConnection; private final Connection proxyConnection; @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); // 重点在这里,如果调用了其close方法,则实际执行的是将连接放回连接池的操作 if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) { dataSource.pushConnection(this); return null; } else { try { if (!Object.class.equals(method.getDeclaringClass())) { // issue #579 toString() should never fail // throw an SQLException instead of a Runtime checkConnection(); } // 其他的操作都交给realConnection执行 return method.invoke(realConnection, args); } catch (Throwable t) { throw ExceptionUtil.unwrapThrowable(t); } } } }那我们来看看pushConnection做了什么
public class PooledDataSource implements DataSource { private final PoolState state = new PoolState(this); protected void pushConnection(PooledConnection conn) throws SQLException { synchronized (state) { // 从活跃连接池中移除连接 state.activeConnections.remove(conn); if (conn.isValid()) { // 空闲连接集合未满 if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) { state.accumulatedCheckoutTime += conn.getCheckoutTime(); // 回滚未提交的事务 if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } // 创建新的 PooledConnection PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this); state.idleConnections.add(newConn); // 复用时间信息 newConn.setCreatedTimestamp(conn.getCreatedTimestamp()); newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp()); // 将原连接置为无效状态 conn.invalidate(); if (log.isDebugEnabled()) { log.debug("Returned connection " + newConn.getRealHashCode() + " to pool."); } // 通知等待的线程 state.notifyAll(); } else { // 空闲连接集合已满 state.accumulatedCheckoutTime += conn.getCheckoutTime(); // 回滚未提交的事务 if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } // 关闭数据库连接 conn.getRealConnection().close(); if (log.isDebugEnabled()) { log.debug("Closed connection " + conn.getRealHashCode() + "."); } conn.invalidate(); } } else { if (log.isDebugEnabled()) { log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection."); } state.badConnectionCount++; } } } }先将连接从活跃连接集合中移除,如果空闲集合未满,此时复用原连接的字段信息创建新的连接,并将其放入空闲集合中即可;若空闲集合已满,此时无需回收连接,直接关闭即可。
连接池总觉得很神秘,但仔细分析完其代码之后,也就没那么神秘了,就是将连接使用完之后放到一个集合中,下面再获取连接的时候首先从这个集合中获取。 还有PooledConnection的代理模式的使用,值得我们学习
好了,我们已经获取到了数据库连接,接下来要创建PrepareStatement了,我们上面JDBC的例子是怎么获取的? psmt = conn.prepareStatement(sql);,直接通过Connection来获取,并且把sql传进去了,我们看看Mybaits中是怎么创建PrepareStatement的
创建PreparedStatement
BaseStatementHandler
stmt = handler.prepare(connection, transaction.getTimeout()); public abstract class BaseStatementHandler implements StatementHandler { protected BoundSql boundSql; @Override public Statement prepare(Connection connection, Integer transactionTimeout) throws SQLException { ErrorContext.instance().sql(boundSql.getSql()); Statement statement = null; try { // 创建 Statement statement = instantiateStatement(connection); // 设置超时和 FetchSize setStatementTimeout(statement, transactionTimeout); setFetchSize(statement); return statement; } catch (SQLException e) { closeStatement(statement); throw e; } catch (Exception e) { closeStatement(statement); throw new ExecutorException("Error preparing statement. Cause: " + e, e); } } protected abstract Statement instantiateStatement(Connection connection) throws SQLException; } public class PreparedStatementHandler extends BaseStatementHandler { @Override protected Statement instantiateStatement(Connection connection) throws SQLException { //获取sql字符串,比如"select * from user where id= ?" String sql = boundSql.getSql(); // 根据条件调用不同的 prepareStatement 方法创建 PreparedStatement if (mappedStatement.getKeyGenerator() instanceof Jdbc3KeyGenerator) { String[] keyColumnNames = mappedStatement.getKeyColumns(); if (keyColumnNames == null) { //通过connection获取Statement,将sql语句传进去 return connection.prepareStatement(sql, PreparedStatement.RETURN_GENERATED_KEYS); } else { return connection.prepareStatement(sql, keyColumnNames); } } else if (mappedStatement.getResultSetType() != null) { return connection.prepareStatement(sql, mappedStatement.getResultSetType().getValue(), ResultSet.CONCUR_READ_ONLY); } else { return connection.prepareStatement(sql); } } }看到没和jdbc的形式一模一样,我们具体来看看connection.prepareStatement做了什么
public class ConnectionImpl implements JdbcConnection, SessionEventListener, Serializable { public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { try { synchronized(this.getConnectionMutex()) { this.checkClosed(); ClientPreparedStatement pStmt = null; boolean canServerPrepare = true; String nativeSql = (Boolean)this.processEscapeCodesForPrepStmts.getValue() ? this.nativeSQL(sql) : sql; if ((Boolean)this.useServerPrepStmts.getValue() && (Boolean)this.emulateUnsupportedPstmts.getValue()) { canServerPrepare = this.canHandleAsServerPreparedStatement(nativeSql); } if ((Boolean)this.useServerPrepStmts.getValue() && canServerPrepare) { if ((Boolean)this.cachePrepStmts.getValue()) { synchronized(this.serverSideStatementCache) { pStmt = (ClientPreparedStatement)this.serverSideStatementCache.remove(new ConnectionImpl.CompoundCacheKey(this.database, sql)); if (pStmt != null) { ((ServerPreparedStatement)pStmt).setClosed(false); ((ClientPreparedStatement)pStmt).clearParameters(); } if (pStmt == null) { try { //这里使用的是ServerPreparedStatement创建PreparedStatement pStmt = ServerPreparedStatement.getInstance(this.getMultiHostSafeProxy(), nativeSql, this.database, resultSetType, resultSetConcurrency); if (sql.length() < (Integer)this.prepStmtCacheSqlLimit.getValue()) { ((ServerPreparedStatement)pStmt).isCacheable = true; } ((ClientPreparedStatement)pStmt).setResultSetType(resultSetType); ((ClientPreparedStatement)pStmt).setResultSetConcurrency(resultSetConcurrency); } catch (SQLException var14) { if (!(Boolean)this.emulateUnsupportedPstmts.getValue()) { throw var14; } pStmt = (ClientPreparedStatement)this.clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false); if (sql.length() < (Integer)this.prepStmtCacheSqlLimit.getValue()) { this.serverSideStatementCheckCache.put(sql, Boolean.FALSE); } } } } } else { try { pStmt = ServerPreparedStatement.getInstance(this.getMultiHostSafeProxy(), nativeSql, this.database, resultSetType, resultSetConcurrency); ((ClientPreparedStatement)pStmt).setResultSetType(resultSetType); ((ClientPreparedStatement)pStmt).setResultSetConcurrency(resultSetConcurrency); } catch (SQLException var13) { if (!(Boolean)this.emulateUnsupportedPstmts.getValue()) { throw var13; } pStmt = (ClientPreparedStatement)this.clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false); } } } else { pStmt = (ClientPreparedStatement)this.clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false); } return (PreparedStatement)pStmt; } } catch (CJException var17) { throw SQLExceptionsMapping.translateException(var17, this.getExceptionInterceptor()); } } }我们只用看最关键的代码,使用ServerPreparedStatement的getInstance返回一个PreparedStatement,其实本质上ServerPreparedStatement继承了PreparedStatement对象,我们看看其构造方法
public class ServerPreparedStatement extends ClientPreparedStatement { protected static ServerPreparedStatement getInstance(JdbcConnection conn, String sql, String db, int resultSetType, int resultSetConcurrency) throws SQLException { return new ServerPreparedStatement(conn, sql, db, resultSetType, resultSetConcurrency); } protected ServerPreparedStatement(JdbcConnection conn, String sql, String db, int resultSetType, int resultSetConcurrency) throws SQLException { super(conn, db); this.checkNullOrEmptyQuery(sql); String statementComment = this.session.getProtocol().getQueryComment(); ((PreparedQuery)this.query).setOriginalSql(statementComment == null ? sql : "/* " + statementComment + " */ " + sql); ((PreparedQuery)this.query).setParseInfo(new ParseInfo(((PreparedQuery)this.query).getOriginalSql(), this.session, this.charEncoding)); this.hasOnDuplicateKeyUpdate = ((PreparedQuery)this.query).getParseInfo().getFirstStmtChar() == 'I' && this.containsOnDuplicateKeyInString(sql); try { this.serverPrepare(sql); } catch (SQLException | CJException var8) { this.realClose(false, true); throw SQLExceptionsMapping.translateException(var8, this.exceptionInterceptor); } this.setResultSetType(resultSetType); this.setResultSetConcurrency(resultSetConcurrency); } }设置运行时参数到 SQL 中
我们已经获取到了PreparedStatement,接下来就是将运行时参数设置到PreparedStatement中,如下代码
handler.parameterize(stmt);我们来看看parameterize方法
public class PreparedStatementHandler extends BaseStatementHandler { @Override public void parameterize(Statement statement) throws SQLException { // 通过参数处理器 ParameterHandler 设置运行时参数到 PreparedStatement 中 parameterHandler.setParameters((PreparedStatement) statement); } } public interface ParameterHandler { void setParameters(PreparedStatement ps) throws SQLException; } public class DefaultParameterHandler implements ParameterHandler { private final TypeHandlerRegistry typeHandlerRegistry; private final MappedStatement mappedStatement; private final Object parameterObject; private final BoundSql boundSql; private final Configuration configuration; @Override public void setParameters(PreparedStatement ps) { ErrorContext.instance().activity("setting parameters").object(mappedStatement.getParameterMap().getId()); /* * 从 BoundSql 中获取 ParameterMapping 列表,每个 ParameterMapping 与原始 SQL 中的 #{xxx} 占位符一一对应 */ List<ParameterMapping> parameterMappings = boundSql.getParameterMappings(); if (parameterMappings != null) { for (int i = 0; i < parameterMappings.size(); i++) { ParameterMapping parameterMapping = parameterMappings.get(i); if (parameterMapping.getMode() != ParameterMode.OUT) { Object value; // 获取属性名 String propertyName = parameterMapping.getProperty(); if (boundSql.hasAdditionalParameter(propertyName)) { // issue #448 ask first for additional params value = boundSql.getAdditionalParameter(propertyName); } else if (parameterObject == null) { value = null; } else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) { value = parameterObject; } else { // 为用户传入的参数 parameterObject 创建元信息对象 MetaObject metaObject = configuration.newMetaObject(parameterObject); // 从用户传入的参数中获取 propertyName 对应的值 value = metaObject.getValue(propertyName); } TypeHandler typeHandler = parameterMapping.getTypeHandler(); JdbcType jdbcType = parameterMapping.getJdbcType(); if (value == null && jdbcType == null) { jdbcType = configuration.getJdbcTypeForNull(); } try { // 由类型处理器 typeHandler 向 ParameterHandler 设置参数 typeHandler.setParameter(ps, i + 1, value, jdbcType); } catch (TypeException e) { throw new TypeException("Could not set parameters for mapping: " + parameterMapping + ". Cause: " + e, e); } catch (SQLException e) { throw new TypeException("Could not set parameters for mapping: " + parameterMapping + ". Cause: " + e, e); } } } } } }首先从boundSql中获取parameterMappings 集合,这块大家可以看看我前面的文章,然后遍历获取 parameterMapping中的propertyName ,如#{name} 中的name,然后从运行时参数parameterObject中获取name对应的参数值,最后设置到PreparedStatement 中,我们主要来看是如何设置参数的。也就是typeHandler.setParameter(ps, i + 1, value, jdbcType);,这句代码最终会向我们例子中一样执行,如下
public class StringTypeHandler extends BaseTypeHandler<String> { @Override public void setNonNullParameter(PreparedStatement ps, int i, String parameter, JdbcType jdbcType) throws SQLException { ps.setString(i, parameter); } }还记得我们的PreparedStatement是什么吗?是ServerPreparedStatement,那我们就来看看ServerPreparedStatement的setString方法
public class ServerPreparedStatement extends ClientPreparedStatement { @Override public void setURL(int parameterIndex, URL x) throws SQLException { checkClosed(); setString(parameterIndex, x.toString()); } } public class ClientPreparedStatement extends com.mysql.cj.jdbc.StatementImpl implements JdbcPreparedStatement { @Override public void setString(int parameterIndex, String x) throws SQLException { synchronized (checkClosed().getConnectionMutex()) { ((PreparedQuery<?>) this.query).getQueryBindings().setString(getCoreParameterIndex(parameterIndex), x); } } } public class ServerPreparedQueryBindings extends AbstractQueryBindings<ServerPreparedQueryBindValue> { @Override public void setString(int parameterIndex, String x) { if (x == null) { setNull(parameterIndex); } else { //根据参数下标从parameterBindings数组总获取BindValue ServerPreparedQueryBindValue binding = getBinding(parameterIndex, false); this.sendTypesToServer.compareAndSet(false, binding.resetToType(MysqlType.FIELD_TYPE_VAR_STRING, this.numberOfExecutions)); //设置参数值 binding.value = x; binding.charEncoding = this.charEncoding; binding.parameterType = MysqlType.VARCHAR; } } } public class ServerPreparedQueryBindings extends AbstractQueryBindings<ServerPreparedQueryBindValue> { public ServerPreparedQueryBindValue getBinding(int parameterIndex, boolean forLongData) { if (this.bindValues[parameterIndex] == null) { // this.bindValues[parameterIndex] = new ServerPreparedQueryBindValue(); } else { if (this.bindValues[parameterIndex].isStream && !forLongData) { this.longParameterSwitchDetected = true; } } //根据参数下标从bindValues数组中获取BindValue return this.bindValues[parameterIndex]; } }执行查询
执行查询操作就是我们文章开头的最后一行代码,如下
return handler.<E>query(stmt, resultHandler);我们来看看query是怎么做的
public class PreparedStatementHandler extends BaseStatementHandler { @Override public <E> List<E> query(Statement statement, ResultHandler resultHandler) throws SQLException { PreparedStatement ps = (PreparedStatement) statement; //直接执行ServerPreparedStatement的execute方法 ps.execute(); return resultSetHandler.<E> handleResultSets(ps); } } public class ClientPreparedStatement extends com.mysql.cj.jdbc.StatementImpl implements JdbcPreparedStatement { @Override public boolean execute() throws SQLException { synchronized (checkClosed().getConnectionMutex()) { JdbcConnection locallyScopedConn = this.connection; if (!this.doPingInstead && !checkReadOnlySafeStatement()) { throw SQLError.createSQLException(Messages.getString("PreparedStatement.20") + Messages.getString("PreparedStatement.21"), MysqlErrorNumbers.SQL_STATE_ILLEGAL_ARGUMENT, this.exceptionInterceptor); } ResultSetInternalMethods rs = null; this.lastQueryIsOnDupKeyUpdate = false; if (this.retrieveGeneratedKeys) { this.lastQueryIsOnDupKeyUpdate = containsOnDuplicateKeyUpdateInSQL(); } this.batchedGeneratedKeys = null; resetCancelledState(); implicitlyCloseAllOpenResults(); clearWarnings(); if (this.doPingInstead) { doPingInstead(); return true; } setupStreamingTimeout(locallyScopedConn); Message sendPacket = ((PreparedQuery<?>) this.query).fillSendPacket(); String oldDb = null; if (!locallyScopedConn.getDatabase().equals(this.getCurrentDatabase())) { oldDb = locallyScopedConn.getDatabase(); locallyScopedConn.setDatabase(this.getCurrentDatabase()); } // // Check if we have cached metadata for this query... // CachedResultSetMetaData cachedMetadata = null; boolean cacheResultSetMetadata = locallyScopedConn.getPropertySet().getBooleanProperty(PropertyKey.cacheResultSetMetadata).getValue(); if (cacheResultSetMetadata) { cachedMetadata = locallyScopedConn.getCachedMetaData(((PreparedQuery<?>) this.query).getOriginalSql()); } // // Only apply max_rows to selects // locallyScopedConn.setSessionMaxRows(((PreparedQuery<?>) this.query).getParseInfo().getFirstStmtChar() == 'S' ? this.maxRows : -1); rs = executeInternal(this.maxRows, sendPacket, createStreamingResultSet(), (((PreparedQuery<?>) this.query).getParseInfo().getFirstStmtChar() == 'S'), cachedMetadata, false); if (cachedMetadata != null) { locallyScopedConn.initializeResultsMetadataFromCache(((PreparedQuery<?>) this.query).getOriginalSql(), cachedMetadata, rs); } else { if (rs.hasRows() && cacheResultSetMetadata) { locallyScopedConn.initializeResultsMetadataFromCache(((PreparedQuery<?>) this.query).getOriginalSql(), null /* will be created */, rs); } } if (this.retrieveGeneratedKeys) { rs.setFirstCharOfQuery(((PreparedQuery<?>) this.query).getParseInfo().getFirstStmtChar()); } if (oldDb != null) { locallyScopedConn.setDatabase(oldDb); } if (rs != null) { this.lastInsertId = rs.getUpdateID(); this.results = rs; } return ((rs != null) && rs.hasRows()); } } }只看最关键的executeInternal方法
public class ClientPreparedStatement extends com.mysql.cj.jdbc.StatementImpl implements JdbcPreparedStatement { protected <M extends Message> ResultSetInternalMethods executeInternal(int maxRowsToRetrieve, M sendPacket, boolean createStreamingResultSet, boolean queryIsSelectOnly, ColumnDefinition metadata, boolean isBatch) throws SQLException { synchronized (checkClosed().getConnectionMutex()) { try { JdbcConnection locallyScopedConnection = this.connection; ((PreparedQuery<?>) this.query).getQueryBindings() .setNumberOfExecutions(((PreparedQuery<?>) this.query).getQueryBindings().getNumberOfExecutions() + 1); ResultSetInternalMethods rs; CancelQueryTask timeoutTask = null; try { timeoutTask = startQueryTimer(this, getTimeoutInMillis()); if (!isBatch) { statementBegins(); } //执行sql并返回结果 rs = ((NativeSession) locallyScopedConnection.getSession()).execSQL(this, null, maxRowsToRetrieve, (NativePacketPayload) sendPacket, createStreamingResultSet, getResultSetFactory(), metadata, isBatch); if (timeoutTask != null) { stopQueryTimer(timeoutTask, true, true); timeoutTask = null; } } finally { if (!isBatch) { this.query.getStatementExecuting().set(false); } stopQueryTimer(timeoutTask, false, false); } return rs; } catch (NullPointerException npe) { checkClosed(); // we can't synchronize ourselves against async connection-close due to deadlock issues, so this is the next best thing for // this particular corner case. throw npe; } } } }参考: https://www.cnblogs.com/java-chen-hao/p/11758412.html