此文被笔者收录在系列文章 架构师必备(系列) 中,在实践中,委托是创建线程安全类最有效的策略之一:用已有的线程安全类来管理所有状态即可。
一、同步容器-串行
同步容器包括Vector和Hashtable、以及Collections.synchronziedXXX工厂方法创建的容器类,这些类通过封装它们的状态,并对每一个公共方法进行同步而实现了线程安全。这种封装对于单线程程序是线程安全的,对于复合操作,只能采用客户端锁的方式来解决了。
public class UnsafeVectorHelpers {
public static Object getLast(Vector list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
public static void deleteLast(Vector list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
}public class SafeVectorHelpers {
public static Object getLast(Vector list) {
synchronized (list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
}
//客户端加锁的实现方式
public static void deleteLast(Vector list) {
synchronized (list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
}
}//这种调用方式是最可靠的,但是效率也是最低的
synchronized(list){
for (int i=0; i<100;i++){
list.get(i);
}
}
迭代器和ConcurrentModificationException
现在的容器类也都没有消除复合操作产生的问题。在迭代时都需要在迭代期间对容器加锁。有两种方式可以解决:
隐藏迭代器
当容器本身做为一个元素时,我们把关键的方法设置了同步,但有些比如hashCode和equals这样的方法都会间接地调用迭代。可能都会引起ConcurrentModificationException如果把这些元素设置成synchronizedXXX的方式就不会出现这样的问题。比如把下面的HashSet换成synchronizedSet。
public class HiddenIterator {
@GuardedBy("this")
private final Set<Integer> set = new HashSet<Integer>();
public synchronized void add(Integer i) {
set.add(i);
}
public synchronized void remove(Integer i) {
set.remove(i);
}
public void addTenThings() {
Random r = new Random();
for (int i = 0; i < 10; i++)
add(r.nextInt());
System.out.println("DEBUG: added ten elements to " + set);
}
}
二、并发容器-并行
同步容器通过对容器的所有状态进行串行访问从而实现了它们的线程安全。这样的代码是削弱了并发性,当多个线程共同竞争容器级的锁时,吞吐量就会降低。同步容器类在每个操作的执行期间都持有一个锁。
用并发容器替换同步容器,java5添加了两个新的容器:Queue和BlockingQueue,LinkedList就实现了Queue(有序队列),BlockingQueue扩展了Queue增加了可阻塞的get和set操作。
ConcurrentHashMap
ConcurrentHashMap扩展了Map接口,和HashMap一样是一个哈希表,但使用完全不同的锁策略。可以提供更好的并发性和可伸缩性。它使用一个更加细化的锁,叫分离锁。这种锁机制允许更深层次的共享访问、任意数量的读线程可以并发访问Map、有限数量的写线程可以并发修改Map。也正因不提供独占锁的方式,所以不能使用客户端加锁来创建新的原子操作。
ConcurrentHashMap与其他的并发容器一起,进一步改进了同步容器类,提供不会抛出ConcurrentModificationException的迭代器,因此不需要在容器迭代中加锁,它返回的迭代器具有弱一致改,而非“及时失败”的。
这类的容器只是保证了重要的功能如get、put、containsKey、remove等,像size和isEmpty这样的功能被弱化了,只会提供一个估算的值,因为并发的重要意义在于运动。
如果程序需要同步时优先考虑使用ConcurrentHashMap,只有程序在访问时需要独占锁时才会使用Hashtable和SynchronizedMap。
CopyOnWriteArrayList
CopyOnWriteArrayList是同步List的一个并发替代品,通常情况下它提供了更好的并发性,并避免了在迭代期间对容器加锁和复制。
写入时复制容器的线程安全性来源于这样一个事实:只要有效的不可变对象被正确发布,那么访问它将不再需要更多的同步,在每次需要修改时,它们会创建并重新发布一个新的容器拷贝,以此来实现可变性。
当对容器迭代操作的频率远远高于对容器修改的频率时,使用CopyOnWriteArrayList是个合理的选择,比如事件listener。
三、阻塞队列和生产者-消费者模式
阻塞队列(Blocking queue)提供了可阻塞的put和take方法,以及定时的offer和poll方法。如果Queue满了,put方法会被阻塞真到有空间可用,同时提供了offer方法来返回失败状态,如果Queue是空的,那么take会被阻塞,直到有元素可用,定义上分为有界和无界队列。通用的作法是限制队列的长度,控制生产和消费者的速率,超负荷的工作条目序列化后写入磁盘或用其它方法遏制生产者线程。
生产者--消费者设计是围绕阻塞队列展开的,它实现了生产和消费代码的解耦,只通过一个队列来进行沟通。如果阻塞队列不能满足你的设计需要也可以用信号量(Semaphore)创建其他的阻塞数据结构。
1、类库中有一些阻塞队列的实现,其中LinkedBlockingQueue和ArrayBlockQueue是FIFO队列,与LinkedList和ArrayList类似,但是有比同步List更好的并发性能。
2、PriorityBlockingQueue是一个按优先级顺序排序的队列,它本身可以自然排序(如果元素本身实现了Comparable),也可以定义一个Comparator排序。
3、SynchronousQueue,它其实不是一个真正的队列,因为它不会为队列元素维护存储空间,但它维护一个排列的线程清单,这些线程等待把元素加入或移入队列,就好比生产者--消费者模式中,洗盘子的直接把盘子放进烘干机,不需要盘子架一样。当移交被接受,它就知道消费者已得到了任务。因为 SynchronousQueue没有存储的能力,所以除非另一个线程已经准备好参与移交工作,否则put和take会一直阻塞, SynchronousQueue这类队列只在消费者充足的时候比较合适。
桌面搜索
比如扫描本地驱动器并归档文件,为之后的搜索建立索引的代理。类似于Google Desktop或windows索引服务。这有利于把一个大活动分为很多小的活动再进行组合,每个活动只有一个单一的任务。由阻塞队列掌控所有的控制流。很多生产--消费者设计都可以通过使用Executor任务执行框架来实现,它自身就应用了生产--消费者模式。
public class ProducerConsumer {
static class FileCrawler implements Runnable {
private final BlockingQueue<File> fileQueue;
private final File root;
public FileCrawler(BlockingQueue<File> fileQueue,
File root) {
this.fileQueue = fileQueue;
this.root = root;
}
public void run() {
crawl(root);
}
private void crawl(File root) throws InterruptedException {
fileQueue.put(entry);
}
}
static class Indexer implements Runnable {
private final BlockingQueue<File> queue;
public Indexer(BlockingQueue<File> queue) {
this.queue = queue;
}
public void run() {
while (true)
indexFile(queue.take());
}
public void indexFile(File file) {
// Index the file...
};
}
private static final int BOUND = 10; //队列的长度,不致于占用太多的内容
private static final int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); //CPU个数,保证CPU忙
public static void startIndexing(File[] roots) {
BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);
for (File root : roots) //多线程启动,共享queue,且每次都不会重复,通过共享的queue来完成了所有权的交互。
new Thread(new FileCrawler(queue, filter, root)).start();
for (int i = 0; i < N_CONSUMERS; i++)
new Thread(new Indexer(queue)).start();
}
}
连续的线程限制
在java.util.concurrent中实现的阻塞队列,全部都包含充分的内部同步,从而能安全地将对象从生产者线程发布至消费者线程。
对象池拓展了连续的线程限制,把对象“借给”一个请求线程,只要对象池中含有充足的内部同步,使对象池能安全地发布,并且客户端本身不会发布对象池或者在返回对象池后不再继续使用,所有权可以在线程间安全地传递。这种多个线程协作控制的方式称为连续的线程限制。
双端队列和窃取工作
Deque和BlockingDeque分别扩展了Queue和BlockQueue,允许高效是在头和尾分别进行插入和移除,具体实现ArrayDeque和LinkedBlockingDeque。
双端阻塞队列与一种叫窃取工作的模式相关联,一个消费者设计中,所有的消费者只共享一个工作队列,在窃取工作的设计中,每一个消费者都有一个自己的双端队列,如果一个消费者完成了自己双端队列中的全部工作,它可以偷取其它消费者的双端队列中末尾的任务,这样可以防止竞争一个共享的任务队列,也有了更佳的伸缩性。
窃取工作恰好适合用于解决消费者与生产者同体的问题--当运行到一个任务的某单元时,可能会识别出更多的任务。通过把任务做上记号后,把新任务放在队列的末尾,这样就可以保证所有的线程都保持忙碌状态。
四、阻塞和可中断的方法
线程可能会因为几种原因阻塞或暂停:等待I/O操作结束、等待另一个锁、等待从Thread.sleep唤醒、等待另一个线程的计算结果。当一个线程阻塞挂起时,通常设置阻塞状态blocked、waiting、timed_waiting中的一个。这个等待通常是不受他自己控制的,由外部事件来决定,当外部事件发生后,线程被置回runable状态,重新获得被调度的机会。
BlockingQueue的put和take方法都会抛出一个受检查的InterruptedException。当一个方法能抛出这个异常的时候,说明这个方法是一个阻塞方法。
Thread提供了interrupt方法,用来中断一个线程,每个线程都有一个boolean类型的属性,代表线程的中断状态,中断线程时需要设置这个值。中断是一种协作机制。一个线程不能迫使其他线程停止正在做的事情,中断可以取消比较耗时的操作。当你的代码调用一个可以抛出InterruptedException的方法时。你自己的方法也就成为了一个阻塞方法,要为响应中断做好准备,在类库代码中有两种基本选择:
- 传递InterruptedException:如果能避开异常的话,这是明智的选择,如果不能只需要把InterruptedException传递给你的调用者,然后对特定活动进行简单清理后再抛出。
- 恢复中断:有时你不能抛出InterruptedException,这时你可能必须捕获InterruptedException,并在当前线程中调用interrupt方法从中断中恢复,这样调用栈中更高层的代码要以发现中断已经发生。
不允许在捕获InterruptedException后,不做任务响应。除非你扩展了Thread,并因此控制了所有处于调用堆栈上层的代码,可以掩盖InterruptedException。见下例:
public class TaskRunnable implements Runnable {
BlockingQueue<Task> queue;
//恢复中断的例子
public void run() {
try {
processTask(queue.take());
} catch (InterruptedException e) {
// restore interrupted status
Thread.currentThread().interrupt();
}
}
}
五、同步工具类
阻塞队列在容器类中是特有的,它不仅可做为容器还可以协调生产者和消费者线程之间的控制流。Synchronizer是一个对象。它根据本身的状态调节线程的控制流,阻塞队列可以扮演一个Synchronizer角色,其它的还有信号量semaphore、关卡barrier、以及闭锁latch。还可以自己创造一个。他们都有类似的结构:封装状态,这些状态决定着线程执行到了某一点时是通过还是被迫等待,它们还提供操控状态的方法,以及高效地等待Synchronizer进入到期望状态的方法。
latch闭锁
它可以延迟线程的进度直到线程到达终止状态。闭锁工作起来像一个大门,直到闭锁达到终点状态之前,门一直关闭,没有线程能通过,在终点状态到来的时候,门开了,允许所有线程通过,一旦闭锁到达了终点状态,就不能再改变状态了永远处于敞开状态。闭锁可以用来确保特定活动直到其他的活动完成后才发生:
- 确保一个服务不会开始,直到它依赖的其他服务都已经开始。。
- 等待,直到活动的所有部分都为继续处理作好充分准备,比如多玩家的游戏。
CountDownLatch是一个灵活的闭锁实现,闭锁的状态包括一个计数器,初始化为一个正数,用来表示需要等待的事件数。countDown方法对计数器做减操作,表示一个事件发生了。而await方法等待计数器达到零,此时所有需要等待的事件都已发生,如果为非零,await会一直阻塞直到计数器为零,或等待线程中断以及超时。
这种方法可以测试线程N倍并发的情况下执行一个任务的时间。开始阀门在最后用countDown()方法减1,导致startGate.awarit()终止中断状态。所有线程开始执行。最后在endGate.countDown()减1.到0时endGate.await();终止中断状态计算响应时间。
public class TestHarness {
public long timeTasks(int nThreads, final Runnable task)
throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
startGate.await();//等待开始阀门打开,计数器达到0
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException ignored) {
}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end - start;
}
}
FutureTask
FutureTask的计算是通过Callable实现的,它等价于一个可携带结果的Runnable。并有3个状态:等待、运行、完成(完成包括正常结束、取消、异常)。一旦 FutureTask进入完成状态,它会永远停止在这个状态上。
Future.get的行为依赖于任务的状态,如果它已经完成,get可以立刻得到返回结果,否则会被阻塞直到任务转入完成状态,然后会返回结果或抛出异常。 FutureTask把计算的结果从运行计算的线程传送到需要这个结果的线程, FutureTask的规约保证了这种传递建立在结果的安全发布基础之上。
Excecutor框架利用 FutureTask来完成异步任务,并可以用来进行任何潜在的耗时计算,而且可以在真正需要计算结果之前就启动它们开始计算。
public class Preloader {
private final FutureTask<String> future =
new FutureTask<String>(new Callable<String>() {
public String call() throws Exception {
return "test";
}
});
private final Thread thread = new Thread(future);
public void start() {
thread.start();
}
public String get() throws Exception {
try {
return future.get();//如果future执行完成,则直接返回,否则此方法阻塞
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}
public static void main(String[] args) throws Exception {
Preloader p = new Preloader();
p.start();
p.get();
}
}
Semaphore计数信号量
用来控制能够同时访问某特定资源活动的数据、同时执行某一给定操作的数量、实现资源池或给一个容器限定边界。
/*使用Semaphore为容器设置边界*/
public class BoundedHashSet <T> {
private final Set<T> set;
private final Semaphore sem;
//Semaphore管理一组虚拟的许可,通过构造函数来设置许可的数量
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}
public boolean add(T o) throws InterruptedException {
sem.acquire();//这个方法会阻塞直到有许可可用
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded)
sem.release();//使用完成后,释放许可给Semaphore
}
}
public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved)
sem.release();
return wasRemoved;
}
}
Barrier栅栏
栅栏类似于闭锁,不同的闭锁等待的是事件,关卡等待的是其它线程,闭锁是一次使用,而关卡是可以重置状态的。可以用于迭代计算
CyclicBarrier允许一个给定数量的成员多次集中在一个关卡点,这在并行迭代算法中很有用,这个算法会把一个问题拆分成一系列相互独立的子问题。当线程到达关卡点时,调用await阻塞,直到所有线程都到达关卡点,如果所有线程都到达了关卡点,关卡就被成功地突破,所有的线程都被释放,关卡会重置以备下一次使用。如果对await的调用超时,或是阻塞中的线程被中断,那么关卡是失败的,所有对await未完成的调用都通过BrokenBarrierException终止。如果成功通过,为每一个线程返回一个唯一的到达索引号,可以用于下一次操作。关卡通常用来模拟:一个步骤的计算可以并行完成,但要求必须完成所有与一个步骤相关的工作后才能进入下一步。
Exchanger是关卡的另一种形式,它是一种两步关卡,在关卡点会交换数据。当两方进行的活动不对称时,这非常有用。比如当一个线程向缓冲区写入数据,这时另一个线程充当消费者使用这个数据。这些线程可以使用Exchanger进行会面,并用完整的缓冲与空缓冲进行交换,当发生交换对象时,交换为双方的对象建立了一个安全的发布。
public class CellularAutomata {
private final Board mainBoard;
private final CyclicBarrier barrier;
private final Worker[] workers;
public CellularAutomata(Board board) {
this.mainBoard = board;
int count = Runtime.getRuntime().availableProcessors();
this.barrier = new CyclicBarrier(count,
new Runnable() {
public void run() {
mainBoard.commitNewValues();
}});
this.workers = new Worker[count];
for (int i = 0; i < count; i++)
workers[i] = new Worker(mainBoard.getSubBoard(count, i));
}
private class Worker implements Runnable {
private final Board board;
public Worker(Board board) { this.board = board; }
public void run() {
while (!board.hasConverged()) {
for (int x = 0; x < board.getMaxX(); x++)
for (int y = 0; y < board.getMaxY(); y++)
board.setNewValue(x, y, computeValue(x, y));
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
private int computeValue(int x, int y) {
// Compute the new value that goes in (x,y)
return 10;
}
}
public void start() {
for (int i = 0; i < workers.length; i++)
new Thread(workers[i]).start();
mainBoard.waitForConvergence();
}
public static void main(String []args)
{
Board bb = (Board) new BoardImpl();
new CellularAutomata(bb).start();
}
为计算结果建立高效、可伸缩的缓存
几乎每一个服务器就用程序都使用某种形式的高速缓存,复用已有的计算结果可以缩短等待时间,提高吞吐量,代价是占用更多的内存。下面的例子就解决了并发缓存的问题:
//A代表输入,V代表输出public class Memoizer <A, V> implements Computable<A, V> {
//这处之所以这么写,是要注意java的引用传递,调用者在外面创建一个空的map做为形参传进来,
//这样生产-消费者都用共享此map
private final ConcurrentMap<A, Future<V>> cache
= new ConcurrentHashMap<A, Future<V>>();
private final Computable<A, V> c;
public Memoizer(Computable<A, V> c) {
this.c = c;
}
//实现的Computable方法
public V compute(final A arg) throws InterruptedException {
while (true) {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval() throws InterruptedException {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<V>(eval(f == null) {
f = ft;
ft.run(); //c.compute()是在此处运行的,所以if可以去掉
}
}
try {
return f.get();
} catch (CancellationException e) {
cache.remove(arg, f); //FutureTask取消时移除
} catch (ExecutionException e) {//FutureTask异常时也移除
throw LaunderThrowable.launderThrowable(e.getCause());
}
}
}
}
以上的例子中缓存的是一个Future值,整数来了缓存污染的可能性。如果一个计算被取消或失败,未来深度对这个值进行计算都会表现为取消或失败。所以如果发现计算被取消时,需要从缓存中移除。如果发现异常也会移除。这样新请求中的计算才有可能成功。
Memoizer同样有缓存过期的问题,但可以通过FutureTask的一个子类来完成,它会为每一个结果关联一个过期时间,并周期性的扫描缓存中过期的访问,如果发现就会自动移除旧的计算,给新的腾出空间,这样缓存就不会占用多少内存空间了。
六、小结
- 所有并发问题都归结为如何协调访问并发状态,可变状态越少,保证线程安全就越容易。
- 尽量将域声明为final类型,除非需要是可变的。
- 不可变对象天生是线程安全的,它们简单而安全,可以在没有锁或防御性复制的情况下自由的共享
- 封装使管理复杂度变的更可行,在对象中封装数据,让它们能够更加容易地保持不变;在对象中封装同步,使它能够更容易地遵守同步策略
- 用锁来守护每一个可变变量
- 对同一不变约束中的所有变量都使用相同的锁
- 在非同步的多线程情况下,访问可变变量的程序是存在隐患的
- 不要依赖于可以需要同步的小聪明
- 在设计过程中就考虑线程安全,或者在文档中明确地说明它不是线程安全的
- 文档化你的同步策略
这里一定要记住java的引用传递方式,这样定义为final后就可以实现共享