此文被笔者收录在系列文章 架构师必备(系列) 中
安全性和活跃度通常互相牵制,我们一般用锁来保证线程安全、线程池和信号量来约束资源的使用,但使用不当就容易造成“死锁”和“资源死锁”。java不能从死锁中进行恢复。恢复应用程序健康的唯一方式就是中止或重启,然后寄希望于不要发生这样的事情。
一、死锁
如果所有线程以通用的固定秩序获得锁,程序就不会出现锁顺序死锁问题了。同时也要注意外部参数的顺序,下面的例子就存在锁顺序死锁的风险。当存在嵌套锁时,我们必须制定锁的顺序解决这个问题,并在整个应用程序中,获得锁都必须以这种方式。
public class LeftRightDeadlock {private final Object left = new Object();
private final Object right = new Object();
public void leftRight() {
synchronized (left) {
synchronized (right) {
doSomething();
}
}
}
public void rightLeft() {
synchronized (right) {
synchronized (left) {
doSomethingElse();
}
}
}
}
动态的锁顺序避免死锁
有时死锁并不是很容易看出来,可能是由业务引起的,比如下面的转账程序,发生死锁是由业务引起的,当两个线程同时调用transferMoney,一个从X向Y转账,另一个从Y向X转账。程序上不存在死锁,因为我们已经规定了锁的顺序。问题出在了外部调用的入参上。
// 这里有嵌套锁,所以一定要注意锁的顺序public static void transferMoney(Account fromAccount,
Account toAccount,
DollarAmount amount) throws InsufficientFundsException {
synchronized (fromAccount) {
synchronized (toAccount) {
if (fromAccount.getBalance().compareTo(amount) < 0)
throw new InsufficientFundsException();
else {
fromAccount.debit(amount);
toAccount.credit(amount);
}
}
}
}
可以有两种解决方法:顺序锁和“加时赛锁”,加时赛锁原理是在获得两个Account锁之前,就要获得这个“加时赛”锁,这样就能保证一次只有一个线程执行这个有风险的操作以未知的顺序获得锁。制定锁的顺序来避免死锁的例子可以参考:InduceLockOrder.java。这个例子也可能发生死锁,但HASH出现冲突的情况很低,这种技术可以以最小的成本,换来最大的安全性。
下面例子是通过System.identityHashCode定义了锁的顺序。
public class InduceLockOrder {private static final Object tieLock = new Object();
public void transferMoney(final Account fromAcct,
final Account toAcct,
final DollarAmount amount)
throws InsufficientFundsException {
class Helper {
public void transfer() throws InsufficientFundsException {
if (fromAcct.getBalance().compareTo(amount) < 0)
throw new InsufficientFundsException();
else {
fromAcct.debit(amount);
toAcct.credit(amount);
}
}
}
int fromHash = System.identityHashCode(fromAcct);
int toHash = System.identityHashCode(toAcct);
if (fromHash < toHash) {
synchronized (fromAcct) {
synchronized (toAcct) {
new Helper().transfer();
}
}
} else if (fromHash > toHash) {
synchronized (toAcct) {
synchronized (fromAcct) {
new Helper().transfer();
}
}
} else {
synchronized (tieLock) {
synchronized (fromAcct) {
synchronized (toAcct) {
new Helper().transfer();
}
}
}
}
}
interface DollarAmount extends Comparable<DollarAmount> {
}
interface Account {
void debit(DollarAmount d);
void credit(DollarAmount d);
DollarAmount getBalance();
int getAcctNo();
}
class InsufficientFundsException extends Exception {
}
}
如果Account具有一个唯一的,不可变的,并且具有可比性的key,比如帐号,那制定锁的顺序就更加容易了。通过key来排定对象顺序,这样能省去“加时赛”锁的需要。
开放调用解决协作对象间的死锁
在持有锁的时候调用外部方法是在挑战活跃度问题,外部方法可能会获得其他锁,或者遭遇严重超时的阻塞,当你持有锁的时候会延迟其他试图获得该锁的线程。
public class CooperatingDeadlock {// Warning: deadlock-prone!
class Taxi {
@GuardedBy("this") private Point location, destination;
private final Dispatcher dispatcher;
public Taxi(Dispatcher dispatcher) {
this.dispatcher = dispatcher;
}
public synchronized Point getLocation() {
return location;
}
public synchronized void setLocation(Point location) {
this.location = location;
if (location.equals(destination))
dispatcher.notifyAvailable(this);
}
public synchronized Point getDestination() {
return destination;
}
public synchronized void setDestination(Point destination) {
this.destination = destination;
}
}
class Dispatcher {
@GuardedBy("this") private final Set<Taxi> taxis;
@GuardedBy("this") private final Set<Taxi> availableTaxis;
public Dispatcher() {
taxis = new HashSet<Taxi>();
availableTaxis = new HashSet<Taxi>();
}
public synchronized void notifyAvailable(Taxi taxi) {
availableTaxis.add(taxi);
}
public synchronized Image getImage() {
Image image = new Image();
for (Taxi t : taxis)
image.drawMarker(t.getLocation());
return image;
}
}
class Image {
public void drawMarker(Point p) {
}
}
}
在程序中尽量使用开放调用(当调用的方法不需要持有锁时),依赖于开放调用的程序,相比于那些在持有锁的时候还调用外部方法的程序,更容易进行死锁自由度的分析。用这种方式通过减小synchronized的范围可以解决上面的问题。
@GuardedBy("this") private Point location, destination;private final Dispatcher dispatcher;
public Taxi(Dispatcher dispatcher) {
this.dispatcher = dispatcher;
}
public synchronized Point getLocation() {
return location;
}
public synchronized void setLocation(Point location) {
boolean reachedDestination;
synchronized (this) {
this.location = location;
reachedDestination = location.equals(destination);
}
if (reachedDestination)
dispatcher.notifyAvailable(this);
}
public synchronized Point getDestination() {
return destination;
}
public synchronized void setDestination(Point destination) {
this.destination = destination;
}
}
@ThreadSafe
class Dispatcher {
@GuardedBy("this") private final Set<Taxi> taxis;
@GuardedBy("this") private final Set<Taxi> availableTaxis;
public Dispatcher() {
taxis = new HashSet<Taxi>();
availableTaxis = new HashSet<Taxi>();
}
public synchronized void notifyAvailable(Taxi taxi) {
availableTaxis.add(taxi);
}
public Image getImage() {
Set<Taxi> copy;
synchronized (this) {
copy = new HashSet<Taxi>(taxis);
}
Image image = new Image();
for (Taxi t : copy)
image.drawMarker(t.getLocation());
return image;
}
}
二、避免和诊断死锁
在使用定义良好的锁的程序中,监测代码中死锁自由度的策略分为两个部分:首先识别什么地方会获得多个锁,对这些示例进行全局的分析,确保它们锁的顺序在程序中保持一致。尽可能使用开放调用(非获得锁也能调用),这样能从根本上简化分析的难度。
尝试定时的锁
另一项检测死锁和从死锁中恢复的技术,是使用每个显式Lock类中定时tryLock特性,来替代使用内部锁机制。在内部锁的机制中,只要没有获得锁,就会永远保持等待,而显示的锁使你能够定义超时的时间,在规定时间之后tryLock还没有获得锁就返回失败,通过使用超时,尽管这段时间比你预期能够获得锁的时间长很多,你仍然可以在意外发生后重新获得控制权。
一般用这种技术,如果你获取锁的请求超时,可以释放这个锁并后退,等待一会后再尝试(这项技术只有在同时获得两个锁的时候才有效,如果多个锁是在嵌套的方法中被请求的,无法仅仅释放外层的锁)。
public boolean transferMoney(Account fromAcct,Account toAcct,
DollarAmount amount,
long timeout,
TimeUnit unit)
throws InsufficientFundsException, InterruptedException {
long fixedDelay = getFixedDelayComponentNanos(timeout, unit);
long randMod = getRandomDelayModulusNanos(timeout, unit);
long stopTime = System.nanoTime() + unit.toNanos(timeout);
while (true) {
if (fromAcct.lock.tryLock()) {
try {
if (toAcct.lock.tryLock()) {
try {
if (fromAcct.getBalance().compareTo(amount) < 0)
throw new InsufficientFundsException();
else {
fromAcct.debit(amount);
toAcct.credit(amount);
return true;
}
} finally {
toAcct.lock.unlock();
}
}
} finally {
fromAcct.lock.unlock();
}
}
if (System.nanoTime() < stopTime)
return false;
NANOSECONDS.sleep(fixedDelay + rnd.nextLong() % randMod);
}
}
通过线程转储堆栈信息分析死锁
JVM使用线程转储来识别死锁的发生,线程转储包括每个运行中线程的堆栈追踪信息以及与之相似并随之发生的异常,也包括锁的信息。在Unix平台,可以向JVM进程发送SIGOUIT(kill -3)信号或Ctrl+\。windows按Ctrl+Break。
三、其他的活跃度危险
死锁是我们最主要的活跃度危险,并发程序中还有饥饿、丢失、活锁。
饥饿
当线程访问它所需要的资源时却被永久拒绝,以至于不能再继续进行,这样就发生了饥饿,最常见的引是饥饿的资源是CPU周期。在java程序中,使用线程优先级不当、在锁中执行无终止的构建都可能会引起饥饿。
线程优先级实际中并不起什么作用。如果开始改变线程的优先级,程序的行为就变成与平台相关了,还可能引起活跃度问题,大多数并发应用程序可以对所有线程使用相同的优先级。Thread.NORM_PRIORITY。
弱响应性
当计算密集型后台计算任务影响到响应性时,这时优先级会发挥作用,应当降低它们的优先级,不良的锁管理也可能引起弱响应性。如果一个线程长时间占有一个锁,其他想要访问该窗器的线程就必须等待很长时间。
活锁
活锁是线程中活跃度失败的另一种形式,尽管没有被阻塞,线程仍然不能继续,因为它不断重试相同的操作。活锁通常发生在消息处理应用程序中,如果消息处理失败,其中传递消息的底层架构会回退整个事务,并把它置回队首,依次循环。这就叫称为“毒药信息”。解决活锁的一种方案就是对重试机制引入一些随机性。
四、小结
活跃度失败是非常严重的问题,因为除了短时间地中止应用程序,没有任何机制可以恢复这种失败。最常见的活跃度失败是锁顺序死锁。所以在设计时就避免锁顺序死锁,确保多个线程在获得多个锁时,使用一致的顺序。最好的解决方法是在程序中使用开放调用,这么减少一个线程一次请求多个锁的情况,并且使这样的多重锁请求的发生更加明显。