《Java并发编程实战》 任务的取消
words: 7.9k views: time: 31min如果外部代码能在某个操作正常完成之前将其置入完成状态,那么这个操作就可以称为可取消的,行为良好的软件都应该能很完善地处理失败、关闭和取消等过程。 一个可取消的任务必须拥有取消策略,在这个策略中定义:
- 其他代码如何请求取消该任务;
- 任务在何时检查是否已经请求了取消;
- 在响应取消请求时应该执行哪些操作;
Java中并没有提供任何机制来安全地终止线程,但它提供了一种协作机制,即通过设置某个已请求取消的标志,然后任务执行过程中定期地查看该标志,如果设置了,那么任务提前结束。
1. 自定义取消
1.1. 示例:取消素数生成器
1 | private class PrimeGenerator implements Runnable{ |
PrimeGenerator
使用一种简单的取消策略:设定一个取消标志,如果被置位了,则说明收到了取消请求。然后在每次搜索素数前首先检查是否存在取消请求,如果存在则取消。因此,客户端可以通过调用cancel
来请求取消,比如:
1 | List<BigInteger> secondPrimes() throws InterruptedException{ |
PrimeGenerator
的取消机制最终会使得素数生成器的任务退出,但在退出的过程中需要花费一定的时间。比如示例中通常不会刚好在运行1秒钟后停止,因为在请求取消的时刻和下一次检查之间存在着延迟。更严重的,如果使用这种方法的任务调用了一个阻塞方法,那么任务可能永远不会检查取消标志,因此永远不会结束。
1 | private class BrokenPrimeProducer extends Thread{ |
BrokenPrimeProducer
中如果生产的速度超过了消费者的处理速度,队列将被填满,put
方法将会阻塞。当阻塞在put
方法上时,如果消费者停止消费并希望取消生产者任务,它会调用cancel()
方法来设置cancelled
标志。但此时已经无法从阻塞的put
方法中恢复过来,因此将永远不能检查这个标志。
1 | void consumePrimes() throws InterruptedException{ |
2. 中断
每个线程中都有一个中断状态,当中断线程时,其状态将被置为ture,并且,Thread
中提供了设置以及查询线程中断状态的方法:
1 | public class Thread { |
在使用interrupted
时要注意,因为它会清除当前线程的中断状态。如果返回true
,说明收到过中断请求,所以除非想屏蔽这个中断,否则应该对它进行处理:比如抛出InterruptedException
,或者再次调用interrupt
来恢复中断状态(这样还可以保留中断请求,并在之后的某个时候进行检查和处理)。
当线程在非阻塞状态下中断时,它的中断状态将被设置,然后由具体的线程任务来决定是否检查以及如何处理。也就是说,中断并不会真正的中断一个正在运行的线程,而只是发出中断请求,然后由线程在下一个合适的时机做出中断响应。如果任务不进行检查处理,那中断状态的设置也只是设置而已。
常见的阻塞方法有Thread.sleep
和Object.wait
等,它们会检查线程中断,并在发现中断时提前返回。它们在响应中断时执行的操作包括:清除中断状态,抛出InterruptedException
,表示阻塞操作由于中断而提前结束。
2.1. 改进:中断素数生成器
通常,中断是实现取消最合理的方式,如果任务能够响应中断,那么可以使用中断作为取消机制,并且可以利用类库提供的中断支持。上面的示例中说明了一些自定义的取消机制无法与可阻塞的库函数实现良好的交互,而使用终端则可以很容易地解决。
1 | public class PrimeProducer extends Thread{ |
PrimeProducer
中有两个位置可以检测中断:即阻塞的put
方法,和循环开始处的判断。
由于调用了阻塞的put
方法,因此可以不进行显示的检测,因为put
会检测并响应中断,并抛出InterruptedException
使任务退出。但在循环开始处执行检测会提高对中断的响应性,如果可阻塞的方法调用频率不高,则不足以获得足够的响应性。
2.2. 中断策略
最合理的中断策略是某种形式的线程级取消操作或服务级取消操作:尽快退出,并在必要时进行清理,通知某个所有者该线程已经退出。一个中断请求可能有一个或多个接收者,比如中断线程池中的某个工作线程,意味着取消当前任务,同时也意味着关闭工作线程。区分任务和线程对中断的响应是很重要的,任务不会在其自己拥有的线程中执行,而是在某个服务拥有的线程中执行。对于非线程所有者的代码来说(比如对于线程池而言,任何在线程池实现以外的代码),应该小心的保存中断状态,这样拥有线程的代码才能对中断做出响应。
.线程是系统提供的一个执行上下文,java中提供了一些对象帮我们管理线程并代理对进行的操作,比如Thread.start()就是开启一个执行上下文,这些对象就是线程的拥有者,即拥有线程的代码。如果我们要执行一段处理逻辑,就将这段逻辑封装成一个任务并委托给这些对象去挂到线程上执行,这段逻辑代码或者这个任务也就是非线程所有者的代码。
.如果希望线程能够复用,那么线程拥有者代码可以在线程执行完一个任务后将线程挂起,然后等待下一个任务。那么问题来了:如果本来想结束服务,中断所有的工作线程,但是中断请求却被线程当前执行的任务代码吃掉了,并且未做任何处理,那么当任务结束后,线程拥有者将感知不到中断请求从而不能结束自己。因此,任务代码应该保留中断请求,这样当自己结束后可以交给线程拥有者处理。
.线程池的设计中考虑到了这点,它定义和维护了一套自己的状态,并在很多节点进行检查,比如在新建和回收工作线程时,因此,即便任务中吃掉了中断请求,它也能根据自己的状态判定是否应该结束服务。
这就是为什么大多数可阻塞的库函数只是抛出InterruptedException
作为中断响应,因为它们永远不会在某个由自己拥有的线程中执行,因此它们为任务或者库代码采用了最合理的取消策略:尽快退出执行流程,并把中断信息传递给调用者,使调用栈上的上层代码可以采取进一步的操作。
2.3. 中断响应
处理中断通常有两种策略:传递异常,使自己的方法也成为可中断的方法;或者恢复中断状态,交给调用栈上层的代码进行处理;
如果不想或无法传递异常(可能任务是通过Runnable
来定义的),那么需要寻找另一种方式来保存中断请求。一种标准的方法就是通过再次调用interrupt
来恢复中断状态。
不应该屏蔽InterruptedException
,比如在catch
中捕获异常却不做任何处理,除非代码中实现了线程的中断策略。虽然上面示例中PrimeGenerator
屏蔽了中断,那是因为它已经知道线程将要结束,已经不需要上层代码进行处理。但大多数代码并不知道它们将在哪个线程中运行,因此应该保存中断状态。
对于一些不支持取消但可以调用可中断阻塞方法的操作,它们必须在循环中调用这些方法,并在发现中断后重新尝试。这种情况下,它们应该在本地保存中断状态,并在返回前恢复,而不是捕获InterruptedException
时恢复。如果过早地设置中断状态,则可能引起无限循环,因为大多数可中断的阻塞方法都会在入口处检查中断状态,并且当发现状态被设置时会立即抛出。
1 | public Task getNextTask(BlockingQueue<Taskgt> queue){ |
有时在取消过程中可能还涉及到其他状态,中断可以用来获得线程的注意,并且由中断线程保存的信息,可以为中断的线程提供进一步的指示。例如,当一个由线程池拥有的工作线程检测到中断时,它会检查线程池是否正在关闭。如果是,它会在结束之前执行一些线程池清理工作,否则,它可能创建一个新线程将线程池恢复到合理的规模。
3. 示例:计时任务
需求:将给定的任务执行指定的时间,并保存异常信息
3.1. 通过外部线程中断
1 | private static final ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(1); |
timedRun
可以在任意线程中调用,它无法知道这个调用线程的中断策略。可能线程就不响应中断,那么timedRun
只能在任务结束时才返回,此时可能已经超时。也可能任务在超时之前完成,那么中断timedRun
所在线程的请求将在任务结束后到达,而那时可能已经在运行其它的代码了,因此,结果一定是不好的。
3.2. 改进:通过专门线程中断
1 | private static final ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(1); |
timedRun
在启动任务线程后,执行一个限时的join
方法,同步等待这个线程结束或者超时。在join
返回后,再检查任务是否有异常抛出。
自定义执行任务的线程拥有自己的执行策略,即使任务不响应中断,限时运行的方法仍能返回到它的调用者。但这只是保证了方法能限时返回,并不代表任务能限时结束,如果任务本身不响应中断,那么它将继续运行下去,而且由于依赖限时join
,将无法知道任务是正常结束了还是join
超时返回。
3.3. 改进:通过Future中断
1 | public static void timedRun(Runnable r,long timeout,TimeUnit unit) throws InterruptedException{ |
submit
会返回一个Future
来描述任务的状态,另外,Future
提供cancel
方法,并带有一个boolean
参数,判断是否对执行的任务进行中断,如果为false,就直接唤醒在get()上阻塞的任务,并返回一个取消异常,至于任务还是照样执行,只是结果不要了。而如果为true, 则是在唤醒等待结果的线程之前先对执行的任务发出一个中断请求,至于中断请求是否得到响应,则看具体任务的实现。
4. 不可中断的阻塞问题
并非所有的可阻塞方法或阻塞机制都能响应中断,对于那些由于执行不可中断操作而被阻塞的线程,可以使用类似于中断的手段来停止这些线程,但这要求必须指定线程阻塞的原因。
Java.io中的同步Socket:在服务器应用程序中,最常见的阻塞I/O形式就是对套接字的读取和写入。虽然
InputStream
和OutputStream
中的read
和write
等方法都不会响应中断,但通过关闭底层的套接字,可以使由于执行read
或write
等方法而被阻塞的线程抛出一个SocketException
。Java.io中的同步I/O:当中断一个正在
InterruptibleChannel
上等待的线程时,将抛出ClosedByInterruptException
,并关闭链路(这会使得其他在这条链路上阻塞的线程同样抛出ClosedByInterruptException
)。当关闭一个InterruptibleChannel
时,将导致所有在链路上阻塞的线程都抛出AsynchronousCloseException
。selector的异步I/O:如果一个线程在调用
Selector.select
方法时阻塞了,那么调用close
或wakeup
方法会使线程抛出ClosedSelectorException
并提前返回。获取某个锁:如果一个线程由于等待某个内置锁而阻塞,那么将无法响应中断,它不会理会中断请求。但在
Lock
中提供了lockInterruptibly
等方法,其允许在等待锁的同时仍能响应中断。
4.1. 重写 Thread.interrupt()
1 | public class ReaderThread extends Thread{ |
对于非标准的取消操作,一种办法是重写interrupt
方法,将取消动作封装在Thread
中。
比如ReaderThread
管理了一个套接字连接,采用同步方式从该套接字中读取数据,并将接收到的数据传递给processBuffer
。为了结束某个用户的连接或者关闭服务器,ReaderThread
改写了interrupt
方法,使其既能响应中断,也能关闭Socket
。
4.2. 定制 ThreadPoolExecutor.newTaskFor
1.定义CancellableTask
扩展Callable
接口,增加一个cancel
方法和一个newTask
工厂方法来构造RunnableFuture
1 | public interface CancellableTask<T> extends Callable<T> { |
2.定义CancellingExecutor
扩展ThreadPoolExecutor
,改写newTaskFor
返回自定义的Future
1 | public class CancellingExecutor extends ThreadPoolExecutor{ |
3.定义SocketUsingTask
实现CancellableTask
,在cancel
中关闭套接字,并在newTask
构造的RunnableFuture
中重写Future.cancel()
,即调用自己的cancel
进行套接字关闭。
1 | public abstract class SocketUsingTask<T> implements CancellableTask<T>{ |
5. 停止基于线程的服务
应用程序通常会拥有多个线程服务,比如线程池,并且这些服务的生命周期通常比创建它们的方法的生命周期更长。如果应用程序准备退出,那么这些服务所拥有的线程也应该结束,以便使jvm正常退出。
与其他封装对象一样,线程的所有权是不可传递的:应用程序可以拥有服务,服务可以拥有工作线程,但应用程序并不能拥有工作线程。因此,应用程序不能直接停止工作线程,相反,服务应该提供生命周期方法,来关闭它自己以及它所拥有的线程。这样,当应用程序关闭该服务时,再由服务关闭所拥有的线程。比如ExecutorService
中提供的shutdown
和shutdownNow
方法。
6. 示例:日志服务
1 | public class LogWriter { |
LogWriter
给出了一个简单的日志服务,产生日志的线程由LogWriter
通过BlockingQueue
将日志交给打印线程,并由打印线程写出。
要停止打印线程很容易,因为它调用的take
能响应中断,因此,只需中断打印线程就能停止服务。但是,这样直接关闭会丢失队列中正在等待写出的日志,而其他线程将在log
上阻塞,因为日志队列很快会填满。因此,当取消一个生产—消费操作时,应该同时取消生产者和消费者。而示例中由于生产者并不是专门的线程,因此很难取消。
6.1. 改进:设置关闭标志
可以设置一个关闭标志,以避免进一步提交日志,并在收到关闭请求后,消费者将队列中的所有消息写入日志,并解除所有在log
上阻塞的线程
1 | public class LogWriter { |
为LogWriter
提供可靠的关闭操作需要解决竞态条件问题,因此,要使日志消息的提交操作成为原子操作。但是,并不希望在消息加入队列时去持有一个锁,因为put
方法本身就可以阻塞。
可以通过原子方式来检查关闭请求,并有条件地递增一个计数器。通过关闭标志和计数器,可以确保在关闭日志服务时,所有已经提交的日志都能被写出。
6.2. 改进:委托ExecutorService
简单的程序可以直接在main
函数中启动和关闭全局的ExecutorService
,但在复杂的程序中,通常会将ExecutorService
封装在某个更高级别的服务中,并且该服务能提供自己的生命周期方法。
通过封装ExecutorService
,可以将线程的所有权从ExecutorService
扩展到服务以及应用程序,所有权链上的各个成员都将管理它所拥有的服务或线程的生命周期。
1 | public class LogService { |
7. 单次任务服务
如果某个方法需要处理一批任务,并且当所有任务都处理完成后才返回,那么可以通过一个私有的Executor
来简化服务的生命周期管理,这个Executor
的生命周期由这个方法来控制。
1 | boolean checkMail(Set<String> hosts,long timeout,TimeUnit unit) throws InterruptedException{ |
checkMail
能在多台主机上并行地检查新邮件,它创建一个私有的Executor,并向每台主机提交一个任务。当所有邮件检查任务都执行完成后,关闭Executor,等待结束。
8. ExecutorService.shutdownNow的问题
通过shutdownNow
来强行关闭ExecutorService
时,它会尝试取消正在执行的任务,并返回所有已提交但尚未开始的任务,但是无法知道哪些任务已经开始但尚未结束。
1 | public class TrackingExecutor extends AbstractExecutorService{ |
TrackingExecutor
装饰了ExecutorService
,在execute
中记录哪些任务是在关闭后取消的。在Executor结束后,getCancelledTasks返回被取消的任务清单。由于在finally
中检测中断状态,因此如果任务要被记录,在返回时必须维持线程的中断状态,设计良好的任务都应该这样实现。
但TrackingExecutor
中存在一个竞态条件,在任务执行完最后一条指令以及线程池将任务标记为结束的两个时刻之间,线程池可能被关闭,这样一些被认为取消的任务可能实际上已经完成,就可能导致误报(任务实际已经结束,但线程池关闭了,不会再进行记录)。不过如果任务两次执行的结果相同,例如网络爬虫程序,则可以忽略,否则,需要考虑这个风险。
.对于这里TrackingExecutor存在的竞态问题,如果可以将中断时保存取消的任务这件事交给任务自己去做,即任务过程中如果检测到中断就将自己记录到取消任务中,并且任务自己清楚任务到底完成到什么进度。但这里对于TrackingExecutor,提交给它的并不是定制的任务,它只能通用的假设任务的中断处理方式是向上传递。
- TrackingExecutor的使用示例:网页爬虫
当爬虫程序必须关闭时,我们通常希望保存它的状态,以便稍后重新启动
1 | public abstract class WebCrawler { |
9. 线程的异常终止
导致线程提前死亡的常见原因就是RuntimeException
,由于这些异常表示出现了某种编程错误或者其他不可修复的错误,因此它们不会被捕获,也不会在调用栈中逐层传递,而是默认在控制台中输出栈追踪信息,并终止线程,从而造成线程泄露
如果任务抛出了一个未检查异常,那么它将使线程终结,但会首先通知服务该线程已经终结。然后,服务可能会用新的线程来代替,也可能不会,因为可能线程池正在关闭,或者当前已有足够多的线程满足需要。ThreadPoolExecutor
和Swing
都是通过这项技巧来确保行为糟糕的任务不会影响到后续任务的执行。
标准线程池允许当发生未捕获异常时结束线程,由于使用try-finally
来接收通知,因此当线程结束时,将有新的线程来代替它。但如果没有提供捕获异常处理器或者其他的故障通知机制,那么任务会悄悄失败,从而导致混乱。
如下为典型的线程池工作者线程结构:
1 | public void run(){ |
- UncaughtExceptionHandler
Thread
提供了UncaughtExceptionHandler
,它能检测出某个线程由于异常而终结的情况。当一个线程由于异常而退出时,JVM会把这个事件报告给应用程序提供的UncaughtExceptionHandler
异常处理器。如果没有任何异常处理器,那么默认的行为是将栈追踪信息输出到System.err
,而通常的响应方式是将一个错误信息以及相应的栈追踪信息写入到日志中。
1 | public interface UncaughtExceptionHandler{ |
为ThreadPoolExecutor
的构造函数提供一个ThreadFactory
,可以为池中的所有线程设置一个UncaughtExceptionHandler
,但是,只有通过execute
提交的任务,才能将抛出的异常交给异常处理器,而通过submit
提交的任务,无论抛出的是未检查异常还是已检查异常,都将被任务视为返回状态的一部分。如果一个由submit
提交的任务由于抛出了异常而结束,那么这个异常将在Future.get
封装的ExecutionException
中重新抛出。
10. 守护线程
线程可以分为普通线程和守护线程,在JVM启动时创建的所有线程中,除主线程外,其他的都是守护线程,例如垃圾回收器以及其他执行辅助工作的线程。当创建一个新线程时,新线程将继承创建它的线程的守护状态,因此在默认情况下,主线程创建的所有线程都是普通线程。
普通与守护的差异仅在当线程退出时发生的操作,当一个线程退出时,JVM会检查其他正在运行的线程,如果这些线程都是守护线程,那么JVM会正常退出操作。当JVM停止时,所有仍然存在的守护线程都将被抛弃,即不会执行finally
代码块,也不会执行回卷栈,而JVM只是直接退出。
应该尽可能少地使用守护线程,很少有操作能够在不进行清理的情况下被安全地抛弃。特别是,如果在守护线程中执行可能包含I/O操作的任务,那么将是一种危险的行为。
11. 关闭钩子
jvm既可以正常地关闭,也可以强行关闭。正常关闭的方式有多种,包括当最后一个非守护线程结束时,或者当调用System.exit
时。也可以通过Runtime.halt
,或者在操作系统中杀死进程来强行关闭。
关闭钩子就是通过Runtime.addShutdownHook
注册的但尚未开始的线程,可以注册多个关闭钩子,但jvm并不保证它们的调用顺序,且jvm不会停止或中断任何在关闭时仍然运行的应用线程,它们将一起并发执行。当正常关闭时,jvm首先调用所有已注册的关闭钩子,当jvm最终结束时,这些应用线程将被强行结束,因此关闭钩子应该要保证是线程安全,且在访问共享数据时要小心的避免死锁。当强行关闭时,并不会运行关闭钩子,而只是关闭了jvm。
所以为了避免关闭钩子出现问题,关闭钩子不应该依赖那些可能被应用程序或其他关闭钩子关闭的服务。一种可靠的建议是使用同一个关闭钩子,在钩子中执行一些列串行地操作。
12. 终结器
当不再需要内存资源时,可以通过垃圾回收器回收它们,但对于一些资源,例如文件句柄或套接字,当不再需要时,必须显示地交还给操作系统。因此,为了实现这个功能,java给对象定义了一个终结器finalize()
,并且垃圾回收器在释放对象时也会检查并执行对象的finalize()
,从而释放一些系统资源。
但垃圾回收器的行为是不可控的,它的运行策略由jvm决定,因此建议是避免使用,绝大多情况下,都可以使用try-finally
来代替,而且工作的更好。
.这个在《深入理解Java虚拟机》中有更详细的介绍,另外在线程池的实现中可以见到这个方法的使用,因为如果服务关闭时忘了通知它,那么它确实没有其它办法,只能在被GC回收时由垃圾回收器帮忙检查释放资源。
参考:
- Copyright ©《java并发编程实战》