前言
这里就是分布式锁的最后一个系列,关于Etcd的方式。很多人可能并没有接触过Etcd,也不知道这是个什么东西。所以我们先来介绍一下关于Etcd的基本概念
搞定“Etcd”
What Is Etcd
熟悉Linux的大家都知道,在Linux下存在一个目录:/etc,该目录是一个全局的配置存储目录。
而我们要介绍到的Etcd的最初的主要目的也是来解决集群管理系统中OS升级时的分布式并发控制,配置文件的存储和分发等问题。在云的领域应用最为广泛,由于其特性逐渐为人所知。更多的使用场景在k8s上。
那么,Etcd基于Go语言实现,CoreOS公司开源的一款高可用,一致性的小型Key-Value存储数据库,并且热度不断上升
Etcd是通过Raft共识算法来达到数据一致性的
关于Raft共识算法这里不会过多介绍,大家根据 The Secret Lives of Data 这个网站来进行相关的学习
通常情况下,我们可以使用Etcd来做如下事情:
- 服务注册和发现
- 配置中心
- 集群监控
- 分布式锁,分布式ID
Etcd架构
gRPC
当客户端发送操作请求之后,先会到达gRPC层面,然后gRPC才会将操作的具体指定向后分发到其他组件。
除了接收客户端请求之外,gRPC还需要处理各个节点之间的心跳请求和同步请求
wal【Write Ahead Log】
预写式日志,是实现事务的标准方法,跟MySQL中的redo log类似。
Etcd在操作的时候会先进行写日志的操作,但是此时日志状态为prepare,等待某一个时刻将日志提交落盘并且修改操作数据
wal在日志落盘的时候属于顺序写入,这样能够提高IO性能
snapshot
快照,Leader节点用来向其他节点进行数据同步从而达到主数据一致性的关键
boltdb
boltdb是一个单机的支持事务的kv存储,而etcd的事务就是基于boltdb的事务来实现的。
boltdb为每一个key都创建了一个索引,该索引通过B+Tree来维护。其中该B+Tree存储了key所对应的版本数据。
也就是说每操作一次,etcd都会记录一个版本号,并且会存储对应版本号所对应的数据
Etcd,你过来呀
工欲善其事必先利其器,说的再多,不如实际上手来试一试,接下来我们开始搭建etcd的环境吧
环境规划
一定要记住一句话:好记性不如烂笔头。
拿到一台机器之后,不要盲目上手就开始装各种东西,一定要做好整个环境的规划,不慌不乱
node ip port etcd 192.168.10.200 2379,2380Etcd安装
etcd属于一款开源产品,在github我们就能看到其源码。
如果你本地环境有GO版本的话,那么可以通过编译安装的形式来安装,我不是Go Coder,所以我这里就采用最简单的安装方式
来,跟着我一起操作
yum install -y etcd等待完成之后,etcd的安装也就已经完毕,接下来我们来验证一下
etcd --version etcdctl -v说明已经安装成功了
接下来我们来看一看etcd的配置
基本配置
默认情况下,yum的安装方式会在/etc/etcd下存在配置文件,所以cd /etc/etcd我们进入到这个目录下, 会发现存在etcd.conf
这里最好先备份一下,然后我们再调整配置
# 单机 #[Member] # 监听etcd 各个节点间通信,设置为自己的服务器的IP地址,当前最好能够指定hostname ETCD_LISTEN_PEER_URLS="http://192.168.10.200:2380" # 监听客户端通信 ETCD_LISTEN_CLIENT_URLS="http://192.168.10.200:2379" #[Clustering] # 对外公告的该节点客户端监听地址 ETCD_ADVERTISE_CLIENT_URLS="http://192.168.10.200:2379"这是单机版的最小配置,其他的如鉴权等操作在对应位置配置
然后我们启动etcd
systemctl start etcd实际生产环境下单机版可用性不高,我们接下来介绍一下集群操作
集群配置
环境规划就不说了,先按照单机版本安装成功,然后直接看配置
首先,需要注意的是,Etcd集群组成最少需要三台节点,需要用于选取Leader节点,多的话最好是奇数台,那么配置如下
一定要注意ETCD_NAME,否则会启动失败
#[Member] ETCD_DATA_DIR="/var/lib/etcd/default.etcd" TCD_LISTEN_PEER_URLS="http://192.168.10.200:2380" ETCD_LISTEN_CLIENT_URLS="http://192.168.10.200:2379,http://127.0.0.1:2379" ETCD_NAME="slave01" #[Clustering] ETCD_INITIAL_ADVERTISE_PEER_URLS="http://192.168.10.200:2380" ETCD_ADVERTISE_CLIENT_URLS="http://192.168.10.200:2379" ETCD_INITIAL_CLUSTER="master=http://192.168.10.201:2380,slave01=http://192.168.10.200:2380,slave02=http://192.168.10.202:2380" ETCD_INITIAL_CLUSTER_TOKEN="etcd-cluster" ETCD_INITIAL_CLUSTER_STATE="new"以上配置为集群的最小配置,配置完之后启动etcd就好。而且需要注意的是:
- 在ETCD_INITIAL_CLUSTER中,名称必须和自身节点的ETCD_NAME保持一致
- 在ETCD_INITIAL_CLUSTER中,规划的主节点一定要写在前面
问题:etcd cluster is unavailable or misconfigured
在集群过程中可能会遇到如下问题:这是因为etcd本身无法找到127.0.0.1的原因
Error: client: etcd cluster is unavailable or misconfigured; error #0: dial tcp 127.0.0.1:4001: connect: connection refused ; error #1: dial tcp 127.0.0.1:2379: connect: connection refused error #0: dial tcp 127.0.0.1:4001: connect: connection refused error #1: dial tcp 127.0.0.1:2379: connect: connection refused所以:
- ETCD_LISTEN_CLIENT_URLS="http://192.168.10.200:2379,http://127.0.0.1:2379"该配置是重点
Etcd相关操作
相关操作不是本节重点,这里就简单列出一些
这里需要注意一下:etcdctl 默认使用V2版本的API,如果想要换成V3的话,进行如下操作
echo 'export ETCDCTL_API=3' >> /etc/profile source /etc/profile接下来就是具体的操作命令
# 会列出相对的帮助列表 etcdctl # 列出集群节点的信息 etcdctl member list # 插入,读取 etcdctl put key value etcdctl get key # 列出一个key的详细信息 etcdctl get key -w json # 监控指定key,包括增删改动作都能监控到 etcdctl watch key # 删除 etcdctl del key就列出这些吧,感兴趣的大家下来自己搭建一下,亲自动手感受一下
别动我的“蛋糕”
知其然
Lease机制
Etcd作为一款Key-Value形式的存储数据库,类似于Redis,支持对存储的K-V设置租约,当租约到期时key-value 将失效删除;同时也支持续约,通过客户端可以在租约到期之前续约,避免因为K-V对过期失效而导致锁被删除,也保证分布式的安全性。
同时对锁设置租约,即使锁持有者因故障无法主动释放,锁也能因为租约到期而自动释放
Revision机制
定义的每一个key都带有一个Revision版本号,这个版本号是全局唯一的。每进行一次事务操作,该版本号就会加一。
通过该Revision版本号就能清楚写操作的顺序,在实现分布式锁的时候,多个客户端同时抢锁,根据Revision版本号的大小依次获得锁就可以实现公平锁
Prefix机制
前缀机制,在后面代码实现的时候可能会更清晰一点。
比如我们定义的lockPath = /etcd/lock,多个客户端争抢进行写操作,而此时实际写入到Etcd的key为
- /etcd/lock/{UUID}
这里的UUID表示全局唯一的ID,确保了多个客户端key的唯一性。
而上面我们也说过,基于Revision机制,返回的Revision号不一样。那么我们就可以通过如下方式来判断自己是否可以获取到锁:
- 通过前缀/etcd/lock查询,返回包含客户端Key-Value对列表,同时也会包含各自的Revision。通过判断Revision大小,客户端就可以判断自己是否获取锁
这里很像Zookeeper中的有序节点
Watch机制
即监听,Etcd中Watch机制支持监听某个固定的key,也支持监听某个前缀路径,当被监听者发生变化时客户端将会受到回调通知。
这里和Zookeeper中实现分布式锁的方式非常像:
- 通过Prefix机制获取到的客户端Key-Value对列表中的Revision,并且监听和自己离得最近的一个key。
- 当这个key释放锁之后,自己才能获取到锁
是不是和Zookeeper加锁方式非常像,所以说,一定要记住一点:
- 我们在学习某个知识点的时候,一定要能够对比其中的差异性
知其所以然
那好,了解到锁的原理之后,那么我们就来自己实现一下分布式锁吧
一步一步来,既然是基于Etcd,那么我们就先来获取一下Etcd的客户端
// 注意:客户端使用2379端口,集群的话中间就通过 , 分割 private static final String node = "http://192.168.10.200:2379"; public static Client client() { final ClientBuilder builder = Client.builder().endpoints(node); // 是否需要用户密码 // builder.user(ByteSequence.from()).password(ByteSequence.from()); return builder.build(); }接下来就是加锁和释放锁的过程了,注意睁大眼睛看清楚了,我只说一遍
// 用来暂存线程和key之间的关系 private final ConcurrentMap<Thread, Long> threadData = Maps.newConcurrentMap(); // 获取etcd下锁客户端 private final Lock lockClient; // 获取Lease客户端 private final Lease leaseClient; private final String lockKey; // etcd获取到的加锁地址 private String lockPath; // 租约有效期 private final long leaseTTL; public EtcdLock(Client client, String lockKey, long leaseTTL, TimeUnit unit) { this.lockKey = lockKey; this.leaseTTL = unit.toNanos(leaseTTL); this.lockClient = client.getLockClient(); this.leaseClient = client.getLeaseClient(); } //加锁 public void lock() { Thread currentThread = Thread.currentThread(); // 记录租约 ID Long leaseId = 0L; try { leaseId = leaseClient.grant(TimeUnit.NANOSECONDS.toSeconds(leaseTTL)).get().getID(); // 续租心跳周期, 续约时间的一半 long period = leaseTTL >> 1; // 这里缺少启动定时任务续约:和Redis中的看门狗机制是一样一样的 // 续约方式: leaseClient.keepAliveOnce(leaseId); LockResponse lockResponse = lockClient.lock(ByteSequence.from(lockKey.getBytes()), leaseId).get(); if (lockResponse != null) { lockPath = lockResponse.getKey().toString(Charset.forName(StandardCharsets.UTF_8.name())); LOGGER.info("获取锁成功,锁路径:{},线程:{}", lockPath, currentThread.getName()); } } catch (InterruptedException | ExecutionException e) { LOGGER.error("获取锁失败", e); throw new BusException(e); } // 获取锁成功,锁对象设置 threadData.put(currentThread, leaseId); } // 释放锁 public void unlock() { Thread currentThread = Thread.currentThread(); Long leaseId = threadData.get(currentThread); try { // 释放锁 if (lockPath != null) { lockClient.unlock(ByteSequence.from(lockPath.getBytes())).get(); } // 将定时任务关闭 // 删除租约 if (leaseId != 0L) { leaseClient.revoke(leaseId); } } catch (InterruptedException | ExecutionException e) { LOGGER.error("解锁失败异常:{}", e.getMessage()); throw new BusException(e); } finally { // 移除当前线程资源 threadData.remove(currentThread); } }那接下来就来验证了
private static ExecutorService es = Executors.newFixedThreadPool(10000); private static String key = "/etcd/lock"; public static void main(String[] args) throws InterruptedException { int[] count = {0}; Client client = client(); for (int i = 0; i < 100; i++) { es.submit(() -> { final EtcdLock lock = new EtcdLock(client, key, 20, TimeUnit.SECONDS); try { lock.lock(); count[0]++; } catch (Exception e) { e.printStackTrace(); } finally { try { lock.unlock(); } catch (Exception e) { e.printStackTrace(); } } }); } es.shutdown(); es.awaitTermination(1, TimeUnit.HOURS); System.err.println("执行结果: " + count[0]); }最后
到这里关于Etcd分布式锁就介绍完了,并且整个关于分布式锁的系列也就全部结束了,