《Java并发编程实战》 任务的取消

words: 7.9k    views:    time: 31min

如果外部代码能在某个操作正常完成之前将其置入完成状态,那么这个操作就可以称为可取消的,行为良好的软件都应该能很完善地处理失败、关闭和取消等过程。 一个可取消的任务必须拥有取消策略,在这个策略中定义:

  • 其他代码如何请求取消该任务;
  • 任务在何时检查是否已经请求了取消;
  • 在响应取消请求时应该执行哪些操作;

Java中并没有提供任何机制来安全地终止线程,但它提供了一种协作机制,即通过设置某个已请求取消的标志,然后任务执行过程中定期地查看该标志,如果设置了,那么任务提前结束

1. 自定义取消

1.1. 示例:取消素数生成器

PrimeGenerator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private class PrimeGenerator implements Runnable{

private final List<BigInteger> primes = new ArrayList<BigInteger>();

private volatile boolean cancelled;

public void run(){
BigInteger p = BigInteger.ONE;
while(!cancelled){
p = p.nextProbablePrime();
synchronized(this){
primes.add(p);
}
}
}

public void cancel(){
cancelled = true;
}

public synchronized List<BigInteger> get(){
return new ArrayList<BigInteger>(primes);
}
}

PrimeGenerator使用一种简单的取消策略:设定一个取消标志,如果被置位了,则说明收到了取消请求。然后在每次搜索素数前首先检查是否存在取消请求,如果存在则取消。因此,客户端可以通过调用cancel来请求取消,比如:

secondPrimes
1
2
3
4
5
6
7
8
9
10
List<BigInteger> secondPrimes() throws InterruptedException{
PrimeGenerator generator = new PrimeGenerator();
new Thread(generator).start();
try{
Thread.sleep(1);
}finally{
generator.cancel(); // 保证即使sleep被中断,也能取消素数生成器的执行
}
return generator.get();
}

PrimeGenerator的取消机制最终会使得素数生成器的任务退出,但在退出的过程中需要花费一定的时间。比如示例中通常不会刚好在运行1秒钟后停止,因为在请求取消的时刻和下一次检查之间存在着延迟。更严重的,如果使用这种方法的任务调用了一个阻塞方法,那么任务可能永远不会检查取消标志,因此永远不会结束

BrokenPrimeProducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private class BrokenPrimeProducer extends Thread{

private final BlockingQueue<BigInteger> queue;

private volatile boolean cancelled = false;

BrokenPrimeProducer(BlockingQueue<BigInteger> queue){
this.queue = queue;
}

public void run(){
try{
BigInteger p = BigInteger.ONE;
while(!cancelled){
queue.put(p = p.nextProbablePrime());
}
}catch(InterruptedException e){

}
}

public void cancel(){
cancelled = true;
}
}

BrokenPrimeProducer中如果生产的速度超过了消费者的处理速度,队列将被填满,put方法将会阻塞。当阻塞在put方法上时,如果消费者停止消费并希望取消生产者任务,它会调用cancel()方法来设置cancelled标志。但此时已经无法从阻塞的put方法中恢复过来,因此将永远不能检查这个标志。

consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void consumePrimes() throws InterruptedException{

BlockingQueue<BigInteger> primes = new LinkedBlockingQueue<BigInteger>();

BrokenPrimeProducer producer = new BrokenPrimeProducer(primes);
producer.start();

try{
while(needMorePrimes()){
consume(primes.take());
}
}finally{
producer.cancel(); // 如果producer已经阻塞,将无法取消
}
}

2. 中断

每个线程中都有一个中断状态,当中断线程时,其状态将被置为ture,并且,Thread中提供了设置以及查询线程中断状态的方法:

Thread
1
2
3
4
5
6
public class Thread {
public void interrupt(){} // 中断目标线程
public boolean isInterrupted(){} // 返回目标线程的中断状态
public static boolean interrupted(){} // 清除当前线程的中断状态(清除中断状态的唯一方法),并返回之前的值
//...
}

在使用interrupted时要注意,因为它会清除当前线程的中断状态。如果返回true,说明收到过中断请求,所以除非想屏蔽这个中断,否则应该对它进行处理:比如抛出InterruptedException,或者再次调用interrupt来恢复中断状态(这样还可以保留中断请求,并在之后的某个时候进行检查和处理)。

当线程在非阻塞状态下中断时,它的中断状态将被设置,然后由具体的线程任务来决定是否检查以及如何处理。也就是说,中断并不会真正的中断一个正在运行的线程,而只是发出中断请求,然后由线程在下一个合适的时机做出中断响应。如果任务不进行检查处理,那中断状态的设置也只是设置而已。

常见的阻塞方法有Thread.sleepObject.wait等,它们会检查线程中断,并在发现中断时提前返回。它们在响应中断时执行的操作包括:清除中断状态,抛出InterruptedException,表示阻塞操作由于中断而提前结束。

2.1. 改进:中断素数生成器

通常,中断是实现取消最合理的方式,如果任务能够响应中断,那么可以使用中断作为取消机制,并且可以利用类库提供的中断支持。上面的示例中说明了一些自定义的取消机制无法与可阻塞的库函数实现良好的交互,而使用终端则可以很容易地解决。

PrimeProducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class PrimeProducer extends Thread{

private final BlockingQueue<BigInteger> queue;

PrimeProducer(BlockingQueue<BigInteger> queue){
this.queue = queue;
}

public void run(){
try{
BigInteger p = BigInteger.ONE;
while(!Thread.currentThread().isInterrupted()){
queue.put(p = p.nextProbablePrime());
}
}catch(InterruptedException e){
//直接退出
}
}

public void cancel(){
interrupt();
}
}

PrimeProducer中有两个位置可以检测中断:即阻塞的put方法,和循环开始处的判断。

由于调用了阻塞的put方法,因此可以不进行显示的检测,因为put会检测并响应中断,并抛出InterruptedException使任务退出。但在循环开始处执行检测会提高对中断的响应性,如果可阻塞的方法调用频率不高,则不足以获得足够的响应性。

2.2. 中断策略

最合理的中断策略是某种形式的线程级取消操作或服务级取消操作:尽快退出,并在必要时进行清理,通知某个所有者该线程已经退出。一个中断请求可能有一个或多个接收者,比如中断线程池中的某个工作线程,意味着取消当前任务,同时也意味着关闭工作线程。区分任务和线程对中断的响应是很重要的,任务不会在其自己拥有的线程中执行,而是在某个服务拥有的线程中执行。对于非线程所有者的代码来说(比如对于线程池而言,任何在线程池实现以外的代码),应该小心的保存中断状态,这样拥有线程的代码才能对中断做出响应。

.线程是系统提供的一个执行上下文,java中提供了一些对象帮我们管理线程并代理对进行的操作,比如Thread.start()就是开启一个执行上下文,这些对象就是线程的拥有者,即拥有线程的代码。如果我们要执行一段处理逻辑,就将这段逻辑封装成一个任务并委托给这些对象去挂到线程上执行,这段逻辑代码或者这个任务也就是非线程所有者的代码

.如果希望线程能够复用,那么线程拥有者代码可以在线程执行完一个任务后将线程挂起,然后等待下一个任务。那么问题来了:如果本来想结束服务,中断所有的工作线程,但是中断请求却被线程当前执行的任务代码吃掉了,并且未做任何处理,那么当任务结束后,线程拥有者将感知不到中断请求从而不能结束自己。因此,任务代码应该保留中断请求,这样当自己结束后可以交给线程拥有者处理

.线程池的设计中考虑到了这点,它定义和维护了一套自己的状态,并在很多节点进行检查,比如在新建和回收工作线程时,因此,即便任务中吃掉了中断请求,它也能根据自己的状态判定是否应该结束服务

这就是为什么大多数可阻塞的库函数只是抛出InterruptedException作为中断响应,因为它们永远不会在某个由自己拥有的线程中执行,因此它们为任务或者库代码采用了最合理的取消策略:尽快退出执行流程,并把中断信息传递给调用者,使调用栈上的上层代码可以采取进一步的操作。

2.3. 中断响应

处理中断通常有两种策略:传递异常,使自己的方法也成为可中断的方法;或者恢复中断状态,交给调用栈上层的代码进行处理;

如果不想或无法传递异常(可能任务是通过Runnable来定义的),那么需要寻找另一种方式来保存中断请求。一种标准的方法就是通过再次调用interrupt来恢复中断状态。

不应该屏蔽InterruptedException,比如在catch中捕获异常却不做任何处理,除非代码中实现了线程的中断策略。虽然上面示例中PrimeGenerator屏蔽了中断,那是因为它已经知道线程将要结束,已经不需要上层代码进行处理。但大多数代码并不知道它们将在哪个线程中运行,因此应该保存中断状态。

对于一些不支持取消但可以调用可中断阻塞方法的操作,它们必须在循环中调用这些方法,并在发现中断后重新尝试。这种情况下,它们应该在本地保存中断状态,并在返回前恢复,而不是捕获InterruptedException时恢复。如果过早地设置中断状态,则可能引起无限循环,因为大多数可中断的阻塞方法都会在入口处检查中断状态,并且当发现状态被设置时会立即抛出。

getNextTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public Task getNextTask(BlockingQueue<Taskgt> queue){
boolean interrupted = false;
try{
while(true){
try{
return queue.take();
}catch(InterruptedException e){
interrupted = true;
}
}
}finally{
if(interrupted){ // 返回前恢复中断状态的设置
Thread.currentThread().interrupt();
}
}
}

有时在取消过程中可能还涉及到其他状态,中断可以用来获得线程的注意,并且由中断线程保存的信息,可以为中断的线程提供进一步的指示。例如,当一个由线程池拥有的工作线程检测到中断时,它会检查线程池是否正在关闭。如果是,它会在结束之前执行一些线程池清理工作,否则,它可能创建一个新线程将线程池恢复到合理的规模。

3. 示例:计时任务

需求:将给定的任务执行指定的时间,并保存异常信息

3.1. 通过外部线程中断

timedRun
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static final ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(1);

public static void timedRun(Runnable currentTask,long timeout,TimeUnit unit){

final Thread currentThread = Thread.currentThread();

cancelExec.schedule(new Runnable(){
public void run(){
currentThread.interrupt();
}
}, timeout, unit);

currentTask.run();
}

timedRun可以在任意线程中调用,它无法知道这个调用线程的中断策略。可能线程就不响应中断,那么timedRun只能在任务结束时才返回,此时可能已经超时。也可能任务在超时之前完成,那么中断timedRun所在线程的请求将在任务结束后到达,而那时可能已经在运行其它的代码了,因此,结果一定是不好的。

3.2. 改进:通过专门线程中断

timedRun
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private static final ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(1);

public static void timedRun(Runnable r,long timeout,TimeUnit unit) throws InterruptedException{

// 对任务进行包装
class RethrowableTask implements Runnable{
private volatile Throwable t;
public void run(){
try{
r.run();
}catch(Throwable t){
this.t = t;
}
}

void rethrow(){
if(t != null){
throw launderThrowable(t);
}
}
}
RethrowableTask task = new RethrowableTask();

// 新建一个专门的线程来执行任务
final Thread taskThread = new Thread(task);
taskThread.start();

// 对执行线程进行限时
cancelExec.schedule(new Runnable(){
public void run(){
taskThread.interrupt();
}
}, timeout, unit);

// 使当前线程join等待执行任务的线程结束
taskThread.join(unit.toMillis(timeout));
task.rethrow();
}

timedRun在启动任务线程后,执行一个限时的join方法,同步等待这个线程结束或者超时。在join返回后,再检查任务是否有异常抛出。

自定义执行任务的线程拥有自己的执行策略,即使任务不响应中断,限时运行的方法仍能返回到它的调用者。但这只是保证了方法能限时返回,并不代表任务能限时结束,如果任务本身不响应中断,那么它将继续运行下去,而且由于依赖限时join,将无法知道任务是正常结束了还是join超时返回。

3.3. 改进:通过Future中断

timedRun
1
2
3
4
5
6
7
8
9
10
11
12
public static void timedRun(Runnable r,long timeout,TimeUnit unit) throws InterruptedException{
Future<?> task = taskExec.submit(r);
try{
task.get();
}catch(TimeoutException e){
//取消任务
}catch(ExecutionException e){
throw launderThrowable(e.getCause()); // 如果任务中有异常,那么重新抛出
}finally{
task.cancel(true); // 如果任务还在运行,那么将被中断,但对已经结束的任务没有影响
}
}

submit会返回一个Future来描述任务的状态,另外,Future提供cancel方法,并带有一个boolean参数,判断是否对执行的任务进行中断,如果为false,就直接唤醒在get()上阻塞的任务,并返回一个取消异常,至于任务还是照样执行,只是结果不要了。而如果为true, 则是在唤醒等待结果的线程之前先对执行的任务发出一个中断请求,至于中断请求是否得到响应,则看具体任务的实现。

4. 不可中断的阻塞问题

并非所有的可阻塞方法或阻塞机制都能响应中断,对于那些由于执行不可中断操作而被阻塞的线程,可以使用类似于中断的手段来停止这些线程,但这要求必须指定线程阻塞的原因

  • Java.io中的同步Socket:在服务器应用程序中,最常见的阻塞I/O形式就是对套接字的读取和写入。虽然InputStreamOutputStream中的readwrite等方法都不会响应中断,但通过关闭底层的套接字,可以使由于执行readwrite等方法而被阻塞的线程抛出一个SocketException

  • Java.io中的同步I/O:当中断一个正在InterruptibleChannel上等待的线程时,将抛出ClosedByInterruptException,并关闭链路(这会使得其他在这条链路上阻塞的线程同样抛出ClosedByInterruptException)。当关闭一个InterruptibleChannel时,将导致所有在链路上阻塞的线程都抛出AsynchronousCloseException

  • selector的异步I/O:如果一个线程在调用Selector.select方法时阻塞了,那么调用closewakeup方法会使线程抛出ClosedSelectorException并提前返回。

  • 获取某个锁:如果一个线程由于等待某个内置锁而阻塞,那么将无法响应中断,它不会理会中断请求。但在Lock中提供了lockInterruptibly等方法,其允许在等待锁的同时仍能响应中断。

4.1. 重写 Thread.interrupt()

ReaderThread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ReaderThread extends Thread{

private final Socket socket;

private final InputStream in;

public ReaderThread(Socket socket) throws IOException{
this.socket = socket;
this.in = socket.getInputStream();
}

public void interrupt(){
try{
socket.close();
}catch(IOException e){
// ignore
}finally{
super.interrupt();
}
}

public void run(){
try{
byte[] buf = new byte[1000];
while(true){
int count = in.read(buf);
if(count < 0){
break;
}else if(count > 0){
processBuffer(buf,count);
}
}
}catch(IOException e){

}
}
}

对于非标准的取消操作,一种办法是重写interrupt方法,将取消动作封装在Thread中。

比如ReaderThread管理了一个套接字连接,采用同步方式从该套接字中读取数据,并将接收到的数据传递给processBuffer。为了结束某个用户的连接或者关闭服务器,ReaderThread改写了interrupt方法,使其既能响应中断,也能关闭Socket

4.2. 定制 ThreadPoolExecutor.newTaskFor

1.定义CancellableTask扩展Callable接口,增加一个cancel方法和一个newTask工厂方法来构造RunnableFuture

CancellableTask
1
2
3
4
public interface CancellableTask<T> extends Callable<T> {
void cancel();
RunnableFuture<T> newTask();
}

2.定义CancellingExecutor扩展ThreadPoolExecutor,改写newTaskFor返回自定义的Future

CancellingExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CancellingExecutor extends ThreadPoolExecutor{

public CancellingExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

//...

protected<T> RunnableFuture<T> newTaskFor(Callable<T> callable){
if(callable instanceof CancellableTask){
return ((CancellableTask<T>)callable).newTask();
}else{
return super.newTaskFor(callable);
}
}
}

3.定义SocketUsingTask实现CancellableTask,在cancel中关闭套接字,并在newTask构造的RunnableFuture中重写Future.cancel(),即调用自己的cancel进行套接字关闭。

SocketUsingTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public abstract class SocketUsingTask<T> implements CancellableTask<T>{

private Socket socket;

protected synchronized void setSocket(Socket s){
socket = s;
}

public synchronized void cancel(){
try{
if(socket != null){
socket.close();
}
}catch(IOException e){

}
}

public RunnableFuture<T> newTask(){
return new FutureTask<T>(this){
public boolean cancel(boolean mayInterruptIfRunning){
try{
SocketUsingTask.this.cancel();
}finally{
return super.cancel(mayInterruptIfRunning);
}
}
};
}
}

5. 停止基于线程的服务

应用程序通常会拥有多个线程服务,比如线程池,并且这些服务的生命周期通常比创建它们的方法的生命周期更长。如果应用程序准备退出,那么这些服务所拥有的线程也应该结束,以便使jvm正常退出。

与其他封装对象一样,线程的所有权是不可传递的:应用程序可以拥有服务,服务可以拥有工作线程,但应用程序并不能拥有工作线程。因此,应用程序不能直接停止工作线程,相反,服务应该提供生命周期方法,来关闭它自己以及它所拥有的线程。这样,当应用程序关闭该服务时,再由服务关闭所拥有的线程。比如ExecutorService中提供的shutdownshutdownNow方法。

6. 示例:日志服务

LogWriter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class LogWriter {

private final BlockingQueue<String> queue;

private final LoggerThread logger;

public LogWriter(PrintWriter writer){
this.queue = new LinkedBlockingQueue<String>();
this.logger = new LoggerThread(writer);
}

public void start(){
logger.start();
}

public void log(String msg) throws InterruptedException{
queue.put(msg);
}

private class LoggerThread extends Thread{
private final PrintWriter writer;

public LoggerThread(PrintWriter writer){
this.writer = writer;
}
//...
public void run(){
try{
while(true){
writer.println(queue.take());
}
}catch(InterruptedException e){

}finally{
writer.close();
}
}
}
}

LogWriter给出了一个简单的日志服务,产生日志的线程由LogWriter通过BlockingQueue将日志交给打印线程,并由打印线程写出。

要停止打印线程很容易,因为它调用的take能响应中断,因此,只需中断打印线程就能停止服务。但是,这样直接关闭会丢失队列中正在等待写出的日志,而其他线程将在log上阻塞,因为日志队列很快会填满。因此,当取消一个生产—消费操作时,应该同时取消生产者和消费者。而示例中由于生产者并不是专门的线程,因此很难取消。

6.1. 改进:设置关闭标志

可以设置一个关闭标志,以避免进一步提交日志,并在收到关闭请求后,消费者将队列中的所有消息写入日志,并解除所有在log上阻塞的线程

LogWriter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class LogWriter {

private final BlockingQueue<String> queue;

private final LoggerThread logger;

private PrintWriter writer;

private boolean isShutdown;

private int reservations;

public LogWriter(PrintWriter writer){
this.queue = new LinkedBlockingQueue<String>();
this.logger = new LoggerThread();
}

public void start(){
logger.start();
}

public void stop(){
synchronized(this){
isShutdown = true;
logger.interrupt();
}
}

// syn(检测是否已关闭,否则计数+1),然后put日志
public void log(String msg) throws InterruptedException{
synchronized(this){
if(isShutdown){
throw new IllegalStateException("...");
}
++reservations;
}
queue.put(msg);
}

// syn(检测是否已关闭,并且日志消费完),否则take日志,syn(计数-1),最后print日志
private class LoggerThread extends Thread{
public void run(){
try{
while(true){
try{
synchronized(LogWriter.this){
if(isShutdown && reservations == 0){
break;
}
}
String msg = queue.take();
synchronized(LogWriter.this){
--reservations;
}
writer.println(msg);
}catch(InterruptedException e){

}
}
}finally{
writer.close();
}
}
}
}

LogWriter提供可靠的关闭操作需要解决竞态条件问题,因此,要使日志消息的提交操作成为原子操作。但是,并不希望在消息加入队列时去持有一个锁,因为put方法本身就可以阻塞。

可以通过原子方式来检查关闭请求,并有条件地递增一个计数器。通过关闭标志和计数器,可以确保在关闭日志服务时,所有已经提交的日志都能被写出。

6.2. 改进:委托ExecutorService

简单的程序可以直接在main函数中启动和关闭全局的ExecutorService,但在复杂的程序中,通常会将ExecutorService封装在某个更高级别的服务中,并且该服务能提供自己的生命周期方法。

通过封装ExecutorService,可以将线程的所有权从ExecutorService扩展到服务以及应用程序,所有权链上的各个成员都将管理它所拥有的服务或线程的生命周期。

LogService
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class LogService {

private final ExecutorService exec = Executors.newSingleThreadExecutor();

public void start(){

}

public void stop() throws InterruptedException {
try{
exec.shutdown();
exec.awaitTermination(timeout, unit);
}finally{
writer.close();
}
}

public void log(String msg){
try{
exec.execute(new WriteTask(msg));
}catch(RejectedExecutionException e){

}
}
}

7. 单次任务服务

如果某个方法需要处理一批任务,并且当所有任务都处理完成后才返回,那么可以通过一个私有的Executor来简化服务的生命周期管理,这个Executor的生命周期由这个方法来控制。

checkMail
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
boolean checkMail(Set<String> hosts,long timeout,TimeUnit unit) throws InterruptedException{
ExecutorService exec = Executors.newCachedThreadPool();
final AtomicBoolean hasNewMail = new AtomicBoolean(false);
try{
for(final String host : hosts){
exec.execute(new Runnable(){
public void run(){
if(checkMail(host)){
hasNewMail.set(true);
}
}
});
}
}finally{
exec.shutdown();
exec.awaitTermination(timeout, unit);
}
return hasNewMail.get();
}

checkMail能在多台主机上并行地检查新邮件,它创建一个私有的Executor,并向每台主机提交一个任务。当所有邮件检查任务都执行完成后,关闭Executor,等待结束。

8. ExecutorService.shutdownNow的问题

通过shutdownNow来强行关闭ExecutorService时,它会尝试取消正在执行的任务,并返回所有已提交但尚未开始的任务,但是无法知道哪些任务已经开始但尚未结束。

TrackingExecutor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class TrackingExecutor extends AbstractExecutorService{

private final ExecutorService exec;

private final Set<Runnable> taskCancellAtShutdown = Collections.synchronizedSet(new HashSet<Runnable>());

public TrackingExecutor(ExecutorService exec){
this.exec = exec;
}

public List<Runnable> getCancelledTasks(){
if(!exec.isTerminated()){
throw new IllegalStateException("");
}
return new ArrayList<Runnable>(taskCancellAtShutdown);
}

public void execute(final Runnable runnable){
exec.execute(new Runnable(){
public void run(){
try{
runnable.run();
}finally{
if(isShutdown() && Thread.currentThread().isInterrupted()){
taskCancellAtShutdown.add(runnable);
}
}
}
});
}

//将ExecutorService的其他方法委托给exec
}

TrackingExecutor装饰了ExecutorService,在execute中记录哪些任务是在关闭后取消的。在Executor结束后,getCancelledTasks返回被取消的任务清单。由于在finally中检测中断状态,因此如果任务要被记录,在返回时必须维持线程的中断状态,设计良好的任务都应该这样实现。

TrackingExecutor中存在一个竞态条件,在任务执行完最后一条指令以及线程池将任务标记为结束的两个时刻之间,线程池可能被关闭,这样一些被认为取消的任务可能实际上已经完成,就可能导致误报(任务实际已经结束,但线程池关闭了,不会再进行记录)。不过如果任务两次执行的结果相同,例如网络爬虫程序,则可以忽略,否则,需要考虑这个风险。

.对于这里TrackingExecutor存在的竞态问题,如果可以将中断时保存取消的任务这件事交给任务自己去做,即任务过程中如果检测到中断就将自己记录到取消任务中,并且任务自己清楚任务到底完成到什么进度。但这里对于TrackingExecutor,提交给它的并不是定制的任务,它只能通用的假设任务的中断处理方式是向上传递

  • TrackingExecutor的使用示例:网页爬虫

当爬虫程序必须关闭时,我们通常希望保存它的状态,以便稍后重新启动

WebCrawler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public abstract class WebCrawler {
private volatile TrackingExecutor exec;

private final Set<URL> urlsToCrawl = new HashSet<URL>();

public synchronized void start(){
exec = new TrackingExecutor(Executors.newCachedThreadPool());
for(URL url : urlsToCrawl){
submitCrawlTask(url);
}
urlsToCrawl.clear();
}

public synchronized void stop() throws InterruptedException{
try{
saveUncrawled(exec.shutdownNow()); // 未开始的任务
if(exec.awaitTermination(timeout, unit)){
saveUncrawled(exec.getCancelledTasks()); // 已开始但被中断的任务
}
}finally{
exec = null;
}
}

protected abstract List<URL> processPage(URL url);

private void saveUncrawled(List<Runnable> uncrawled){
for(Runnable task : uncrawled){
urlsToCrawl.add(((CrawlTask)task).getPage());
}
}

private void submitCrawlTask(URL url){
exec.execute(new CrawlTask(url));
}

private class CrawlTask implements Runnable{
private final URL url;

public CrawlTask(URL url) {
this.url = url;
}

public void run(){
for(URL link : processPage(url)){
if(Thread.currentThread().isInterrupted()){
return;
}
submitCrawlTask(link);
}
}

public URL getPage(){
return url;
}
}
}

9. 线程的异常终止

导致线程提前死亡的常见原因就是RuntimeException,由于这些异常表示出现了某种编程错误或者其他不可修复的错误,因此它们不会被捕获,也不会在调用栈中逐层传递,而是默认在控制台中输出栈追踪信息,并终止线程,从而造成线程泄露

如果任务抛出了一个未检查异常,那么它将使线程终结,但会首先通知服务该线程已经终结。然后,服务可能会用新的线程来代替,也可能不会,因为可能线程池正在关闭,或者当前已有足够多的线程满足需要。ThreadPoolExecutorSwing都是通过这项技巧来确保行为糟糕的任务不会影响到后续任务的执行。

标准线程池允许当发生未捕获异常时结束线程,由于使用try-finally来接收通知,因此当线程结束时,将有新的线程来代替它。但如果没有提供捕获异常处理器或者其他的故障通知机制,那么任务会悄悄失败,从而导致混乱。

如下为典型的线程池工作者线程结构:

run
1
2
3
4
5
6
7
8
9
10
11
12
public void run(){
Throwable thrown = null;
try{
while(!isInterrupted()){
runTask(getTaskFromWorkQueue());
}
}catch(Throwable e){
thrown = e;
}finally{
threadExited(this,thrown);
}
}
  • UncaughtExceptionHandler

Thread提供了UncaughtExceptionHandler,它能检测出某个线程由于异常而终结的情况。当一个线程由于异常而退出时,JVM会把这个事件报告给应用程序提供的UncaughtExceptionHandler异常处理器。如果没有任何异常处理器,那么默认的行为是将栈追踪信息输出到System.err,而通常的响应方式是将一个错误信息以及相应的栈追踪信息写入到日志中。

UncaughtExceptionHandler
1
2
3
public interface UncaughtExceptionHandler{
void UncaughtExcept(Thread t, Throwable e);
}

ThreadPoolExecutor的构造函数提供一个ThreadFactory,可以为池中的所有线程设置一个UncaughtExceptionHandler,但是,只有通过execute提交的任务,才能将抛出的异常交给异常处理器,而通过submit提交的任务,无论抛出的是未检查异常还是已检查异常,都将被任务视为返回状态的一部分。如果一个由submit提交的任务由于抛出了异常而结束,那么这个异常将在Future.get封装的ExecutionException中重新抛出。

10. 守护线程

线程可以分为普通线程守护线程,在JVM启动时创建的所有线程中,除主线程外,其他的都是守护线程,例如垃圾回收器以及其他执行辅助工作的线程。当创建一个新线程时,新线程将继承创建它的线程的守护状态,因此在默认情况下,主线程创建的所有线程都是普通线程。

普通与守护的差异仅在当线程退出时发生的操作,当一个线程退出时,JVM会检查其他正在运行的线程,如果这些线程都是守护线程,那么JVM会正常退出操作。当JVM停止时,所有仍然存在的守护线程都将被抛弃,即不会执行finally代码块,也不会执行回卷栈,而JVM只是直接退出。

应该尽可能少地使用守护线程,很少有操作能够在不进行清理的情况下被安全地抛弃。特别是,如果在守护线程中执行可能包含I/O操作的任务,那么将是一种危险的行为。

11. 关闭钩子

jvm既可以正常地关闭,也可以强行关闭。正常关闭的方式有多种,包括当最后一个非守护线程结束时,或者当调用System.exit时。也可以通过Runtime.halt,或者在操作系统中杀死进程来强行关闭。

关闭钩子就是通过Runtime.addShutdownHook注册的但尚未开始的线程,可以注册多个关闭钩子,但jvm并不保证它们的调用顺序,且jvm不会停止或中断任何在关闭时仍然运行的应用线程,它们将一起并发执行。当正常关闭时,jvm首先调用所有已注册的关闭钩子,当jvm最终结束时,这些应用线程将被强行结束,因此关闭钩子应该要保证是线程安全,且在访问共享数据时要小心的避免死锁。当强行关闭时,并不会运行关闭钩子,而只是关闭了jvm。

所以为了避免关闭钩子出现问题,关闭钩子不应该依赖那些可能被应用程序或其他关闭钩子关闭的服务。一种可靠的建议是使用同一个关闭钩子,在钩子中执行一些列串行地操作。

12. 终结器

当不再需要内存资源时,可以通过垃圾回收器回收它们,但对于一些资源,例如文件句柄或套接字,当不再需要时,必须显示地交还给操作系统。因此,为了实现这个功能,java给对象定义了一个终结器finalize(),并且垃圾回收器在释放对象时也会检查并执行对象的finalize(),从而释放一些系统资源。

垃圾回收器的行为是不可控的,它的运行策略由jvm决定,因此建议是避免使用,绝大多情况下,都可以使用try-finally来代替,而且工作的更好。

.这个在《深入理解Java虚拟机》中有更详细的介绍,另外在线程池的实现中可以见到这个方法的使用,因为如果服务关闭时忘了通知它,那么它确实没有其它办法,只能在被GC回收时由垃圾回收器帮忙检查释放资源


参考:

  1. Copyright ©《java并发编程实战》