此文被笔者收录在系列文章 架构师必备(系列) 中
java中没有提供任何机制,来安全是强迫线程停止手头的工作,Thread.stop和Thread.suspend方法存在严重的缺陷,不能使用。但每个Thread提供了Interruption中断,一种协作机制来协调线程间的操作和控制。这是JAVA中推荐的方式。程序不应该立即停止,应该采用中断这种协作机制来处理,正确的做法是:先清除当前进程中的工作,再终止。正常有四种方法:
- 正常结束;
- 设置一个标志位,由外部线程来控制,原理是设置一个volatile变量,使线程池不在创建新线程达到平滑关闭的效果;适合一直在运行的长时间任务;
- 阻塞线程:用interrupt()方法会马上抛出异常,捕获到这个线程后break跳出强制关闭;
- 未阻塞线程:用interrupt()方法设置中断标志位,然后在循环时用isInterrupted()来判断中断标志位,其实和自定义标志位一样原理;
一、任务取消
当外部代码能在活动自然完成之前,把它更改为完成状态,被称为取消。取消的原因很多种可能,比如:用户请求、限时活动、应用程序设计如此、错误、关闭。
java中没有一种绝对安全停止线程的方法,只能选择相互协作的机制,通过协作,使任务和代码遵循一个统一的协议,用来请求取消。一个可取消的任务必须有取消策略,这个策略是一套程序,规定了不同任务或机制间的协作,保证数据的统一。
@ThreadSafepublic class PrimeGenerator implements Runnable {
private static ExecutorService exec = Executors.newCachedThreadPool();
@GuardedBy("this")
private final List<BigInteger> primes = new ArrayList<BigInteger>();
private volatile boolean cancelled;
public void run() {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}
public void cancel() {
cancelled = true;
}
public synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(primes);
}
//让上一个程序在1秒后停止,但这并不是严格的一秒,可能存在误差。
static List<BigInteger> aSecondOfPrimes() throws InterruptedException {
PrimeGenerator generator = new PrimeGenerator();
exec.execute(generator);
try {
SECONDS.sleep(1);
} finally {
generator.cancel();
}
return generator.get();//这是一个自定义的非阻塞的方法
}
}
中断
线程中断是一个协作机制,一个线程给另一个线程发送信号,通知它在方便或可能的情况下停止正在做的工作,去做其他事情 。但实际上,使用中断来处理取消之外的任何事情都是不明智的,中断通常是实现取消最明知的选择,一般在取消方法中设置中断状态。
每个线程都有一个boolean的中断状态,Thread包含一些中断线程的方法:interrupt方法中断目标线程,isInterrupted返回目标线程的中断状态,interrupted用于清除当前线程的中断状态,这是清除中断状态唯一的方法。中断一般不能与可阻塞的函数一起使用。
静态的interrupted方法应该小心使用,它会清除并发线程的中断状态,如果返回了true,必须进行处理,如果想掩盖这个中断,可以抛出 InterruptedException 异常或者再次调用interrupt来保存中断状态。
调用interrupt并不意味着必须停止目标线程正在进行的工作,它仅仅传递了请求中断的信息,意味着完成当前任务,保证数据结构的统一,然后在下一周期结束。有一些方法对这样的请求很重视,比如wait,sleep(阻塞方式),当它们接到中断请求时会抛出一个异常,或者进入时中断状态就已经被设置了。所以这两个方法尽量不要用。
下例中在两处用到了检测中断技术,因为put是个阻塞操作,所以在之前检测总比在之后检测性能更好。前提是此时没有消费者线程或是put是个很耗时的操作,像这种对中断状态进行显式的检测会对调用可中断的阻塞方法时很有用外,因为通常我们不能得到期望的响应。
public class PrimeProducer extends Thread {private final BlockingQueue<BigInteger> queue;
PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
//这个例子可以很好的处理阻塞操作的中断问题,通过中断。
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted())
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
/* Allow thread to exit */
}
}
public void cancel() {
interrupt();
}
}
中断策略
正如需要为任务制定取消策略一样,也应该定制线程中断策略。一个中断策略决定线程如何应对中断请求--当发现中断请求时,它会做什么。
区分任务和线程对中断的反应是很重要的,任务不会在自己拥有的线程中执行,它们借用属于服务的线程,比如线程池,如果代码并不是线程的所有者就应该小心地保存中断状态(如果你给主人打扫房间,主人不在的这段时间你不能把收到的邮件全丢掉,应该收起来待主人回来再处理)。这就是为什么大多数可阻塞的库函数,仅仅抛出InterruptedException做为中断的响应,这也是最合理的策略。
在理中断时应该保存中断状态,也不能简单的是把InterruptedException传递给调用者,应该在它之后恢复中断的状态:Thread.currentThread().interrupt();当检查到中断请求时,任务不需要放弃所有的事情,可以选择推迟直到更合适的时机。这需要记得它已经被请求过中断了,完成当前正在进行的任务,再抛出中断异常或指明中断,这种技术可以保证数据结构不被破坏。
响应中断
有两种处理InterruptedException的实用策略:传递异常和恢复中断状态,使你的方法也成为可中断的阻塞方法,或者保存中断状态,上层调用者代码能对其进行处理。
如果不想或不能传递 InterruptedException 异常,需要另一种方式保存中断请求,因为大多数代码并不知道它们在哪个线程中运行,并再次调用interrupt来恢复中断状态,而不应该掩盖 InterruptedException 异常,如果你的代码没有相应的处理程序,就不应该在catch中捕获这个异常。过早设置中断可能会引起无限循环。
不可取消的任务在任务退出前保存中断
public Task getNextTask(BlockingQueue<Task> queue) {boolean interrupted = false;
try {
while (true) {
try {
return queue.take();
} catch (InterruptedException e) {
interrupted = true;
// fall through and retry
}
}
} finally {
if (interrupted)
Thread.currentThread().interrupt();
}
}
在中断线程之前,应该了解它的中断策略,且不要在外部线程中安排中断。如果要在一个专门的线程中中断任务,这里用到了jion方法,这个方法有不足之处,它如果传一个超时参数,无法确定是由于异常还是因为超时退出的状态。
public class TimedRun2 {private static final ScheduledExecutorService cancelExec = newScheduledThreadPool(1);
public static void timedRun(final Runnable r,
long timeout, TimeUnit unit)
throws InterruptedException {
class RethrowableTask implements Runnable {
private volatile Throwable t;
public void run() {
try {
r.run();
} catch (Throwable t) {
this.t = t;
}
}
void rethrow() {
if (t != null)
throw launderThrowable(t);
}
}
RethrowableTask task = new RethrowableTask();
final Thread taskThread = new Thread(task);
taskThread.start();
cancelExec.schedule(new Runnable() {
public void run() {
taskThread.interrupt();
}
}, timeout, unit);
taskThread.join(unit.toMillis(timeout));
task.rethrow();
}
}
通过Future取消
Future有一个cancel方法,它需要一个boolean参数,它的返回值表示取消尝试是否成功(这仅仅告诉你它是否能接收中断,而不是任务是否检测并处理了中断)。
如果为true并且任务正在一线程中运行,那么这个线程是应该中断的。如果是false并且任务还没启动的话,那这个任务永远不会启动了。除非知道线程的中断策略,否则不应该中断线程,这个例子中cancel何时设置true和false需要考虑。
但任务执行线程是由标准的Executor实现创建的,它实现了一个中断策略,使得任务可以通过中断被取消,这时cancel是安全的。通过Future来中断任务并不影响线程池中其它的线程。在一个专门的线程中中断任务。通过Future来取消任务。
public static void timedRun(Runnable r,long timeout, TimeUnit unit)
throws InterruptedException {
Future<?> task = taskExec.submit(r);
try {
task.get(timeout, unit);
} catch (TimeoutException e) {
// 下面任务会被取消
} catch (ExecutionException e) {
// task中抛出的异常,重抛出
throw launderThrowable(e.getCause());
} finally {
// 如果任务已经取消,是无害的
task.cancel(true); // interrupt if running,如果为false表示如果还没有启动的话,不要运行这个任务,用于那些不处理中断的任务。
}
}
处理不可中断阻塞
很多可阻塞的库方法通过提前返回和抛出InterruptedException来实现对中断的响应,这使得构建可以响应取消的任务更加容易。但有些阻塞方法或阻塞机制并不响应中断。但可以通过与中断类似的手段,来确保可以停止这些线程,前提是我们需要清楚地知道线程为什么会阻塞。
下例展现了一项用来封装非标准取消的技术,为了方便终止一个用户的连接或关闭服务器。重写了interrupt方法。
public class ReaderThread extends Thread {private static final int BUFSZ = 512;
private final Socket socket;
private final InputStream in;
public ReaderThread(Socket socket) throws IOException {
this.socket = socket;
this.in = socket.getInputStream();
}
public void interrupt() {
try {
socket.close();
} catch (IOException ignored) {
} finally {
super.interrupt();
}
}
public void run() {
try {
byte[] buf = new byte[BUFSZ];
while (true) {
int count = in.read(buf);
if (count < 0)
break;
else if (count > 0)
processBuffer(buf, count);
}
} catch (IOException e) { /* Allow thread to exit */
}
}
public void processBuffer(byte[] buf, int count) {
}
}
用newTaskFor钩子方法封装非标准取消
在上个例子中,可以使用newTaskFor钩子函数来改进用来封装非标准取消的方法。这是java 6 中添加到ThreadPoolExecutor的新特性,当提交一个Callable给ExecutorService时,submit返回一个Future,可以用Future来取消任务。newTaskFor钩子是一个工厂方法,创建Future来代表任务,它返回一个RunnableFuture接口,它扩展了Future和Runnable由FutureTask来实现。
自定义的任务Future可以重写canel方法,实现日志或收集取消的统计信息。也可以通过重写Thread.interrupt()实现上面的非标准取消功能。
public interface CancellableTask<T> extends Callable<T> {void cancel();
RunnableFuture<T> newTask();
}
public class CancellingExecutor extends ThreadPoolExecutor{
protected<T> RunnableFuture<T> newTaskFor(Callable<T> callable){
if (callable instanceof CancellableTask)
return ((CancellableTask<T>) callable).newTask();
else
return super.newTaskFor(callable);
}
}
public abstract class SocketUsingTask <T> implements CancellableTask<T> {
@GuardedBy("this") private Socket socket;
protected synchronized void setSocket(Socket s) {
socket = s;
}
public synchronized void cancel() {
try {
if (socket != null)
socket.close();
} catch (IOException ignored) {
}
}
public RunnableFuture<T> newTask() {
return new FutureTask<T>(this) {
public boolean cancel(boolean mayInterruptIfRunning) {
try {
SocketUsingTask.this.cancel();
} finally {
return super.cancel(mayInterruptIfRunning);
}
}
};
}
}
如果SocketUsingTask通过自身Future被取消,执行线程会被中断,这提高了任务对取消的响应性,这样做在保证响应取消的同时,不仅可以安全地调用可中断方法,还可以调用阻塞中的Socket I/O方法。
二、停止基于线程的服务
由于java不提供退出线程惯用的方法,所以需要自行编码结束。实践指出,我们不应该操控某个线程--中断它、改变它的优先级等等,除非你拥有这个线程。 线程通过一个Thread对象表示,线程的所有权也是不能被传递的,但线程可以被自由的共享。
一般的应用程序会有三个部分组成:应用程序拥有服务,服务拥有工作线程,但应用程序并不拥有工作线程,因此应用程序如果想控制线程只能通过服务来处理。就像线程池拥有工作者线程一样。服务比如ExecutorService应该提供生命周期方法来关闭它自己并关闭它拥有的线程。
//这是一个多生产者,单消费者设计,日志信息通过BlockingQueue移交给日志线程public class LogService {
private final BlockingQueue<String> queue;
private final LoggerThread loggerThread;
private final PrintWriter writer;
@GuardedBy("this") private boolean isShutdown;
@GuardedBy("this") private int reservations;
public LogService(Writer writer) {
this.queue = new LinkedBlockingQueue<String>();
this.loggerThread = new LoggerThread();
this.writer = new PrintWriter(writer);
}
public void start() {
loggerThread.start();
}
public void stop() {
synchronized (this) {
isShutdown = true;
}
loggerThread.interrupt();
}
public void log(String msg) throws InterruptedException {
synchronized (this) {
if (isShutdown)
throw new IllegalStateException(/*...*/);
++reservations;
}
queue.put(msg);
}
//内部类
private class LoggerThread extends Thread {
public void run() {
try {
while (true) {
try {
synchronized (LogService.this) {
if (isShutdown && reservations == 0)
break;
}
String msg = queue.take();
synchronized (LogService.this) {
--reservations;
}
writer.println(msg);
} catch (InterruptedException e) { /* retry */
}
}
} finally {
writer.close();
}
}
}
}
另一种更高级的方法。复杂的程序可能把会ExecutorService封装在一个更高层级的服务中,通过增加链接,把所有权链从应用程序扩展到服务,再到线程,每一个链上的成员管理它所拥有的服务或线程的生命周期。
public class LogService1 {private final ExecutorService exec =
public void start(){}
public void stop() throws InterruptedException{
exec.shutdown();
exec.awaitTermination(timeout, unit);
writer.close();
}//LogService委托给ExecutorService执行,LogService管理自己的生命周期
public void log(String msg){
try{
exec.execute(new WriteTask(msg));
}catch(Exception e){}
}
}
致命药丸
另一种保证生产--消费服务关闭的方式是使用poison pill:一个可识别的对象,置于队列中,意味着“当你得到它时或得到一定数量时,停止一切工作”。这种方式适合在生产--消费数量已知的情况下使用。不过在生产--消费者数量较大时很难处理,致命药丸只在无限队列中工作时,才是可靠的。
//生产者线程class CrawlerThread extends Thread {
public void run() {
try {
crawl(root);
} catch (InterruptedException e) { /* fall through */
} finally {
while (true) {
try {
queue.put(POISON);
break;
} catch (InterruptedException e1) { /* retry */
}
}
}
}
private void crawl(File root) throws InterruptedException {
File[] entries = root.listFiles(fileFilter);
if (entries != null) {
for (File entry : entries) {
if (entry.isDirectory())
crawl(entry);
else if (!alreadyIndexed(entry))
queue.put(entry);
}
}
}
}//消费者线程
class IndexerThread extends Thread {
public void run() {
try {
while (true) {
File file = queue.take();
if (file == POISON)
break;
else
indexFile(file);
}
} catch (InterruptedException consumed) {
}
}
public void indexFile(File file) {
/*...*/
};
}public class IndexingService {
private static final int CAPACITY = 1000;
private static final File POISON = new File("");
private final IndexerThread consumer = new IndexerThread();
private final CrawlerThread producer = new CrawlerThread();
private final BlockingQueue<File> queue;
private final FileFilter fileFilter;
private final File root;
public IndexingService(File root, final FileFilter fileFilter) {
this.root = root;
this.queue = new LinkedBlockingQueue<File>(CAPACITY);
this.fileFilter = new FileFilter() {
public boolean accept(File f) {
return f.isDirectory() || fileFilter.accept(f);
}
};
}
private boolean alreadyIndexed(File f) {
return false;
}
public void start() {
producer.start();
consumer.start();
}
public void stop() {
producer.interrupt();
}
public void awaitTermination() throws InterruptedException {
consumer.join();
}
}
只执行一次的服务
如果一个方法需要处理一批任务,并在所有任务结束前不会返回,那么可以通过私有的Executor来简化服务的生命周期管理,其中Executor的寿命限定在该方法中(通常会用到invokeAll和invokeAny方法):向每个主机提交任务,在这之后,当所有检查邮件的任务完成后,会关闭Executor,并等待结束。
public boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit)throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
//为了从内部Runnable访问hasNewMail标志,它必须是final类型的,才能避免被更改
final AtomicBoolean hasNewMail = new AtomicBoolean(false);
try {
for (final String host : hosts)
exec.execute(new Runnable() {
public void run() {
if (checkMail(host))
hasNewMail.set(true);
}
});
} finally {
exec.shutdown();
exec.awaitTermination(timeout, unit);
}
return hasNewMail.get();
}
TrackingExecutor任务跟踪
但这个方法会强制中断正在运行的任务,也就是无法区分哪些任务正在执行中,哪些执行完了,必须自己通过设置检查点来区分,如果不处理可能会造成数据的不一致性。
但可以通过扩展AbstractExecutorService来区分取消和中止的任务。 如TrackingExecutor可以识别那些已经开始,但没有正常结束的任务。任务必须在返回时保存线程的中断状态。TrackingExecutorService例子说明了为后续执行来保存未完成的任务。
public class TrackingExecutor extends AbstractExecutorService {private final ExecutorService exec;
private final Set<Runnable> tasksCancelledAtShutdown =
Collections.synchronizedSet(new HashSet<Runnable>());
public TrackingExecutor(ExecutorService exec) {
this.exec = exec;
}
//返回被取消(已经开始,但没有正常结束)的任务清单
public List<Runnable> getCancelledTasks() {
if (!exec.isTerminated())
throw new IllegalStateException(/*...*/);
return new ArrayList<Runnable>(tasksCancelledAtShutdown);
}
public void execute(final Runnable runnable) {
exec.execute(new Runnable() {
public void run() {
try {
runnable.run();
} finally {
if (isShutdown()
&& Thread.currentThread().isInterrupted())
tasksCancelledAtShutdown.add(runnable);
}
}
});
}
}public abstract class WebCrawler {
private volatile TrackingExecutor exec;
@GuardedBy("this") private final Set<URL> urlsToCrawl = new HashSet<URL>();
private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap<URL, Boolean>();
private static final long TIMEOUT = 500;
private static final TimeUnit UNIT = MILLISECONDS;
public WebCrawler(URL startUrl) {
urlsToCrawl.add(startUrl);
}
public synchronized void start() {
exec = new TrackingExecutor(Executors.newCachedThreadPool());
for (URL url : urlsToCrawl) submitCrawlTask(url);
urlsToCrawl.clear();
}
public synchronized void stop() throws InterruptedException {
try {
saveUncrawled(exec.shutdownNow());
if (exec.awaitTermination(TIMEOUT, UNIT))
saveUncrawled(exec.getCancelledTasks());
} finally {
exec = null;
}
}
protected abstract List<URL> processPage(URL url);
private void saveUncrawled(List<Runnable> uncrawled) {
for (Runnable task : uncrawled)
urlsToCrawl.add(((CrawlTask) task).getPage());
}
private void submitCrawlTask(URL u) {
exec.execute(new CrawlTask(u));
}
private class CrawlTask implements Runnable {
private final URL url;
CrawlTask(URL url) {
this.url = url;
}
private int count = 1;
boolean alreadyCrawled() {
return seen.putIfAbsent(url, true) != null;
}
void markUncrawled() {
seen.remove(url);
System.out.printf("marking %s uncrawled%n", url);
}
public void run() {
for (URL link : processPage(url)) {
if (Thread.currentThread().isInterrupted())
return;
submitCrawlTask(link);
}
}
public URL getPage() {
return url;
}
}
}
三、处理反常的线程终止
导致线程dead的主要原因是RuntimeException,因为这种异常错误是不可修复的。
下面的例子阐述了如何在线程池内部构建一个工作者线程,如果任务抛出了一个未检查的异常,它将允许线程终结,但是会首先通知框架,线程已经终结。然后,框架可能会用新的线程取代这个工作线程,也可能不这么做,因为线程池也许正在关闭,抑或当前已有足够多的线程,能够满足需要了。ThreadPoolExecutor和Swing使用这项技术来确保那些不能正常运转的任务不会影响到后续任务的执行。
需查异常的处理
典型线程池的工作者线程的构建
public void run(){Throwable thrown = null;
try{
while(!isInterrupted())
runTask(getTaskFromWorkQueue());
}catch(Throwable e){
thrown = e;
}finally{
threadExited(this, thrown);
}
}
不需查异常的处理
上面讲到了不需查异常的处理,线程的API同样提供了UncaughtExceptionHandler工具,便于监测到线程因不需查异常引起的dead。这两个方案互为补充,可以有效防止线程的泄漏问题。
当一个线程因为不需查异常退出时,JVM会把这个事件报告给应用程序自定义的Handler。如果handler不存在,默认会用System.err打印信息。至于hander如何处理取决于应用程序对服务质量的要求了,一般会直接写入日志。
public class UEHLogger implements Thread.UncaughtExceptionHandler {public void uncaughtException(Thread t, Throwable e) {
Logger logger = Logger.getAnonymousLogger();
logger.log(Level.SEVERE, "Thread terminated with exception: " + t.getName(), e);
}
}
为了给线程设置UncaughtExceptionHandler需要向ThreadPoolExecutor的构造函数中提供一个ThreadFactory(只有线程的所有者能够改变其UncaughtExceptionHandler)。只有通过execute提交的任务,才能将它抛出的异常送给Handler,而通过submit提交的任务,抛出的任何异常,都会认为是任务返回状态的一部分。如果一个由submit提交的任务以异常作为终结,这个异常会被Future.get重抛出。包装在ExecutionException中。
在一个长时间运行的应用程序中,所有的线程都要给未捕获异常设置一个处理器,这个处理器至少要将异常信息记入日志中。
四、JVM关闭
JVM即可以正常关闭,也可以通过System.exit或是Crtl-C来强制关闭。但JVM也可以通过Runtime.halt或者“杀死”JVM的操作系统进程被强行关闭。`
关闭钩子
在正常的关闭中,JVM首先启动所有已注册的shutdown hook,shutodwn hook是使用Runtime.addShutdownHook(new Thread())注册的尚未开始的线程,JVM不能保证shutodwn hook的开始顺序。当所有shutodwn hook结束的时候,如果runFinalizerOnExit为true,可以选择运行finalizer,之后停止。JVM不会尝试停止或中断任何关闭时仍在运行中的应用程序线程,它们在JVM最终终止时被强制退出。如果shutdown hook或finalizer没有完成,那么正常的关闭进程会“挂起”并且JVM必须强制关闭,这时JVM只会运行强制关闭程序,其它的线程根本不管,包括shutodwn hook。
shutodwn hook应该是线程安全的。shutodwn hook可以用于服务或应用程序的清理,比如del临时文件。并且shutodwn hook全部是并发执行的。关闭日志文件可能引起其他需要使用日志服务的shutodwn hook麻烦,所以shutodwn hook不应该依赖于可能被应用程序或其他shutodwn hook关闭的服务,所有的服务应使用唯一的shutodwn hook。确保关闭的动作在单线程上顺序发生。也就是说一般的应用程序只会注册一人shutdown Hook。
注册shutodwn hook来停止日志服务
public void start(){Runtime.getRuntime().addShutdownHook(new Thread(){
public void run(){
LogService.this.stop();
}
});
}
精灵线程
有时需要创建一个线程,执行一些辅助工作,但这不希望这个线程的存在阻碍JVM的关闭,这里就需要用到daemon thread。线程分为:普通线程和精灵线程。JVM启动的时候创建所有的线程,除了主线程外,其他的都是精灵线程(比如GC)。
他们的区别仅仅在于退出时会发生什么,当一个普通线程退出时,JVM会检查一个运行中线程的详细清单,如果仅有daemon thread时,它会发起正常的退出。当JVM停止时,所有仍然存在的daemon thread都会被抛弃--不会执行finally块,也不会释放栈--JVM直接退出。
daemon thread是好用于“家务管理”的任务,比如一个背景线程可以从内存的缓存中周期性的移除过期的访问。应用程序中,daemon thread不能替代对服务的生命周期恰当、良好的管理。
Finalizer
有些不再需要的资源,GC会自动回收,但有些比如Socket句柄,不需要时,必须显式是归还给操作系统。为了在这方面提供帮助,GC对那些具有特殊finalize方法的对象进行特殊对待,在GC获得它们后,finalize被调用,这样就能保证持久化的资源可以被释放。正确的书写finalizer很困难,所以应该避免使用finalizer。