概述
实际生产的过程中为了实现数据库的高可用,不会只有一个数据库节点。至少会搭建主从复制的数据库架构,从库可以作为主库的数据备份,以免主数据库损坏的情况下丢失数据;当访问量增加的时候可以作为读节点承担部分流量等。下面就进行从零开始搭建MySQL的主从架构。
主从复制原理
以MySQL一主两从架构为为例,也就是一个master节点下有两个slave节点,在这套架构下,写操作统一交给master节点,读请求交给slave节点处理。
为了保证master节点和slave节点数据一致,在master节点写入数据后,会同时将数据复制到对应的slave节点。主从复制数据的过程中会用到三个线程,master节点上的binlog dump线程,slave节点的I\O线程和SQL线程。
主从复制的核心流程:
主从复制模式
MySQL的主从复制模式分为:全同步复制,异步复制,半同步复制,增强半同步复制。
- 全同步复制
全同步复制,就是当主库执行完一个事物之后,要求所有的从库也都必须执行完该事务,才可以返回处理结果给客户端;因此虽然全同步复制数据一致性得到保证了,但是主库完成一个事物需要等待所有从库也完成,性能就比较低了。
- 异步复制
异步复制,当主库提交事务后会通知binlog dump线程发送binlog日志给从库,一旦binlog dump线程将binlog日志发送给从库之后,不需要等到从库也同步完成事务,主库就会讲处理结果返回给客户端。
因为主库只管自己执行完事务,就可以将处理结果返回给客户端,而不用关系从库是否执行完事务,这就可能导致短暂的主从数据不一致的问题了,比如刚在主库插入的数据,如果马上在从库查询就可能查询不到。
当主库提交食物后,如果宕机挂掉了,此时可能binlog还没来得及同步给从库,这时候如果为了回复故障切换主从节点的话,就会出现数据丢失的问题,所以异步复制虽然性能高,但数据一致性上是比较弱的。
MySQL默认采用的是异步复制模式。
- 半同步复制
半同步复制就是在同步复制和异步中做了折中选择,我们可以结合着MySQL官网来看下是半同步和主从复制的过程。
当主库提交事务后,至少还需要一个从库返回接收到binlog日志,并成功写入到relaylog的消息,这个的时候,主库才会讲处理结果返回给客户端。
相比前两种复制方式,半同步复制较好地兼顾了数据一致性以及性能损耗的问题。
同时,半同步复制也存在以下几个问题:
- 增强半同步复制
增强半同步复制是MySQL5.7.2后的版本对半同步复制做的一个改进,原理几乎是一样的,主要是解决幻读的问题。
主库配置了参数rpl_semi_sync_master_wait_point=AFTER_SYNC后,主库在存储引擎提交事务前,必须先首都哦啊从库数据同步完成的确认信息后,才能提交事务,以此来解决幻读问题。
主从同步实战
- 准备数据源
config/datasource.properties
# mastersspring.datasource.masters.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.masters.url=jdbc:mysql://192.168.1.111:3306/monomer_order?useUnicode=true&characterEncoding=utf8&useSSL=false&autoReconnect=true&zeroDateTimeBehavior=convertToNull
spring.datasource.masters.username=root
spring.datasource.masters.password=123456
# slaves
spring.datasource.slaves[0].driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.slaves[0].url=jdbc:mysql://192.168.1.112:3306/monomer_order?useUnicode=true&characterEncoding=utf8&useSSL=false&autoReconnect=true&zeroDateTimeBehavior=convertToNull
spring.datasource.slaves[0].username=root
spring.datasource.slaves[0].password=123456
- 配置数据源
import com.alibaba.druid.pool.DruidDataSourceFactory;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.*;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.util.CollectionUtils;
import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Data
@Configuration
@PropertySource("classpath:config/datasource.properties")
@ConfigurationProperties(prefix = "spring.datasource")
public class DataSourceConfig {
/**
* 主库数据源信息
*/
private Map<String, String> masters;
/**
* 从库数据源信息
*/
private List<Map<String, String>> slaves;
@SneakyThrows
@Bean
public DataSource masterDataSource(){
log.info("masters:{}", masters);
if (CollectionUtils.isEmpty(masters)) {
throw new Exception("主库数据源不能为空");
}
return DruidDataSourceFactory.createDataSource(masters);
}
@SneakyThrows
@Bean
public List<DataSource> slaveDataSources(){
if (CollectionUtils.isEmpty(slaves)) {
throw new Exception("从库数据源不能为空");
}
final ArrayList<DataSource> dataSources = new ArrayList<>();
for (Map<String, String> slaveProperties : slaves) {
log.info("slave:{}", slaveProperties);
dataSources.add(DruidDataSourceFactory.createDataSource(slaveProperties));
}
return dataSources;
}
@Bean
@Primary
@DependsOn({"masterDataSource", "slaveDataSources"})
public DataSource routingDataSource(@Qualifier("masterDataSource") DataSource masterDataSource,
@Qualifier("slaveDataSources"){
final Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put(DataSourceContextHolder.MASTER, masterDataSource);
for (int i = 0; i < slaveDataSources.size(); i++) {
targetDataSources.put(DataSourceContextHolder.SLAVE + i, slaveDataSources.get(i));
}
final DataSourceRouter dataSourceRouter = new DataSourceRouter();
dataSourceRouter.setTargetDataSources(targetDataSources);
dataSourceRouter.setDefaultTargetDataSource(masterDataSource);
return dataSourceRouter;
}
@Bean
public DataSourceTransactionManager dataSourceTransactionManager(
@Qualifier("routingDataSource"){
return new DataSourceTransactionManager(routingDataSource);
}
}
- 数据源上下文切换
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@Slf4j
public class DataSourceContextHolder {
public static final String MASTER = "master";
public static final String SLAVE = "slave";
private static ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();
public static void setDatasourceType(String dataSourceType){
if (StringUtils.isBlank(dataSourceType)) {
log.error("dataSourceType为空");
}
log.info("设置dataSource: {}", dataSourceType);
CONTEXT_HOLDER.set(dataSourceType);
}
public static String getDataSourceType(){
return CONTEXT_HOLDER.get() == null ? MASTER : CONTEXT_HOLDER.get();
}
public static void remove(){
CONTEXT_HOLDER.remove();
}
}
- 数据源路由实现类
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
@Slf4j
public class DataSourceRouter extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
log.info("当前数据源为: {}", DataSourceContextHolder.getDataSourceType());
return DataSourceContextHolder.getDataSourceType();
}
}
- 数据源切换注解
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadOnly {
String value() default DataSourceContextHolder.MASTER;
}
- 动态数据源切换切面
import com.xinxin.order.annotation.ReadOnly;
import com.xinxin.order.context.config.DataSourceContextHolder;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
@Slf4j
@Aspect
@Component
public class DynamicDataSourceAspect implements Ordered {
@Before(value = "execution(* *(..))&& @annotation(readOnly)")
public void before(JoinPoint joinPoint, ReadOnly readOnly) {
log.info(joinPoint.getSignature().getName() + "走从库");
DataSourceContextHolder.setDatasourceType(DataSourceContextHolder.SLAVE);
}
@After(value = "execution(* *(..))&& @annotation(readOnly)")
public void after(JoinPoint joinPoint, ReadOnly readOnly) {
log.info(joinPoint.getSignature().getName() + "清除数据源");
DataSourceContextHolder.remove();
}
@Override
public int getOrder() {
return 0;
}
}
总结
项目整合读写分离主要是通过收到注入数据源,并通过拦截器设置当前线程的数据源类型,需要使用数据源的地方会通过数据源路由器读取当前线程的数据源类型后返回实际的数据源进行数据库的操作。