《Java并发编程实战》 任务的执行
words: 5.1k views: time: 20min任务通常是一些抽象且离散的工作单元,通过把程序中的工作分解到多个任务中,可以简化程序的组织结构,提供一种自然的事务边界来优化错误恢复过程,并提供一种自然的并行工作结构来提升并发性。
在正常的负载下,服务器应用程序应该同时表现出良好的吞吐量和快速的响应性 。程序提供商希望程序支持尽可能多的用户,从而降低每个用户的服务成本,而用户则希望获得尽快的响应。当负荷过载时,程序的性能应该是逐渐降低,而不是直接失败。
理想情况下,可以找出清晰的任务边界,各个任务之间是相互独立的,即任务不依赖于其他任务的状态、结果或边界效应。独立性有助于并发,因为如果存在足够多的处理资源,那么这些独立的任务就可以并行执行,这也为调度与负载均衡等的实现提供了更好的灵活性。
1. 示例:处理请求
需求:将每个请求视为一个独立的任务,进行处理
1.1. 串行执行
1 | public class SingleThreadWebServer { |
SingleThreadWebServer
由于是单线程处理,在等待I/O操作完成时,CPU将处于空闲状态,因此服务器的资源利用率非常低。
通常,在web请求的处理中都会包含一组不同的运算与I/O操作,服务器必须处理套接字I/O以读取请求和写回响应,而这些操作通常会由于网络拥塞或联通性问题而被阻塞。
1.2. 改进:线程执行
1 | public class ThreadPerTaskWebServer { |
ThreadPerTaskWebServer
中以独立的请求为边界,创建线程进行处理。但线程的创建与销毁并非没有代价,并且活跃的线程会消耗系统资源。如果可运行的线程数多于可用处理器数,那么有些线程将会闲置,大量的空闲线程将会占用内存,给垃圾回收器带来压力。另外,大量线程在竞争CPU资源时还将产生额外的性能开销,如果已经有足够多的线程使所以cpu保持忙碌状态,那么更多的线程只会降低性能。
.一般如果任务是cpu密集型的不建议创建超过cpu数量的线程,而如果任务中的I/O操作比较多,则可以创建较多线程,因为I/O的读写速度远小于cpu的处理速度,cpu在等待I/O的时候其实是闲置的,因此创建更多的任务能提高cpu的利用率 。
2. Executor
1 | public interface Executor{ |
任务是一组逻辑工作单元,而线程则是使任务异步执行的机制。在java类库中,任务执行的主要抽象不是Thread,而是Executor。虽然Executor是个简单的接口,但它却为灵活且强大的异步任务执行框架提供了基础,它提供了一种标准的方法将任务的提交过程与执行过程解耦开来。
Executor
使用Runnable
来表示任务,它可以支持多种不同类型的任务执行策略,另外还提供了对生命周期的支持,以及统计信息搜集、应用程序管理机制和性能检测等机制。通过使用Executor
,可以实现各种调优、管理、监视、记录日志、错误报告和其他功能,如果不使用任务执行框架,那么要增加这些功能是非常困难的。但是,如果无法正确地关闭Executor,那么JVM将无法结束,因为JVM只有在所有非守护线程全部终止后才会退出。
2.1. 改进:Executor执行
1 | public class TaskExecutorWebServer { |
TaskExecutorWebServer
中通过Executor
将任务的提交与任务的实际执行解耦开来,并且只需采用另一种不同的Executor
实现,就可以改变服务的行为。比如,为每一个请求都创建一个新线程:
1 | public class TaskExecutorWebServer implements Executor{ |
或者类似于单线程的行为,以同步的方式执行每个任务:
1 | public class TaskExecutorWebServer implements Executor{ |
3. ExecutorService
ExecutorService
扩展了Executor
接口,添加了一些管理生命周期的方法,以及一些用于任务提交的便利方法
1 | public interface ExecutorService extends Executor { |
ExecutorService
的生命周期有3个状态:运行、关闭、终止,在初始化后处于运行状态。
所有任务都完成后,ExecutorService
进入终止状态,可以调用awaitTermination
来等待ExecutorService
到达终止状态,或者通过调用isTerminated
来轮询ExecutorService
是否已经终止。通常awaitTermination
与shutdown
配合调用,以同步等待ExecutorService
关闭。
ThreadPoolExecutor
实现了ExecutorService
,它维护了一个工作队列和一组工作线程worker
,队列中保存了所有等待执行的任务,worker
的任务很简单:即从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。
java类库中提供了一些创建线程池的静态工厂,但一般不建议使用,其实通过构造器可以根据实际场景加更灵活地创建
Executors.newFixedThreadPool
:创建一个固定长度的线程池,每当提交一个任务就创建一个线程,直到达到线程池的最大数量,然后线程池的规模不再变化,如果某个线程由于Exception
而意外结束,那么线程池会补充一个新的线程。Executors.newCachedThreadPool
:创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求,则回收空闲的线程,而当需求增加时,则添加新的线程,线程池的规模不存在任何限制。Executors.newSingleThreadExecutor
:是一个单线程的Executor,它创建单个工作线程来执行任务,如果这个线程异常结束,则创建一个新线程来替代,它可以确保任务按照队列中的顺序来串行执行。Executors.newScheduledThreadPool
:创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务。
3.1. 改进:解决Executor的关闭问题
1 | public class LifecycleWebServer{ |
4. 定时任务
Timer
负责管理延迟或周期任务,然而存在一些缺陷。Timer
在执行所有定时任务时只会创建一个线程,若某个任务的执行时间过长,那么将破坏其他TimerTask
的定时准确性,而且Timer
是基于绝对时间而不是相对时间的调度机制,因此,任务的执行对系统的时钟变化很敏感。另外Timer
线程并不捕获异常,如果TimerTask
抛出未检查的异常,定时线程将终止,Timer
也不会恢复线程,因此已经被调度但尚未执行的TimerTask
将不会再执行,新的任务也不能被调度。
ScheduledThreadPoolExecutor
可以解决这些问题,它只基于相对时间进行调度,而且如果调度线程异常终止,它会启动新的线程来代替。
ScheduledThreadPoolExecutor
内部依赖一个私有实现的DelayedWorkQueue
,来进行任务的调度。其实,通过DelayQueue
也可以构建自己的调度服务。DelayQueue
中管理着一组Delayed
对象,每个Delayed
对象有一个相应的延迟时间,只有某个元素逾期后,才能执行take
操作,其返回的对象将根据延迟时间进行排序。
5. 示例:页面渲染
需求:渲染一个Html页面,假设:页面只包含标签文本,以及预定义大小的图片和URL。
5.1. 串行渲染
串行处理是最简单的方法,这很容易实现,程序只需将输入中的每个元素渲染一次。但这样用户体验很差,他们必须等待很长时间,直到所有渲染完成。
好一点的办法是先渲染文本,同时为图像预留出占位空间。在处理完文本后,再开始下载图像,并将它们渲染到对应的占位空间。
1 | public class SingleThreadRender { |
图像的下载过程大部分时间都是在等待I/O操作,在这期间CPU几乎是空闲的。因此,这种方式没有充分的利用CPU,使得用户在看到最终页面之前要等待很长时间。因此,可以将问题分解为多个独立的任务并发执行,从而获得更高的CPU利用率和灵敏度。
5.2. 改进:Future实现
Future
表示一个任务的生命周期,其提供一些方法来判断任务是否已经完成或取消、获取任务的结果,或者取消任务。
Executor
框架使用Runnable
作为其基本的任务抽象,但Runnable
有很大的局限性,虽然结果可以写到日志文件或者放入某个共享的数据结构,但它不能返回一个值或抛出一个受检查的异常。Callable
是一种更好的抽象,它将返回一个值,并可以抛出异常。
.Executor
会将Runnable
或Callable
统一适配成FutureTask
,其实,FutureTask
就是一个适配器,它实现了Runnable
接口并接收一个Callable
实例。另外它也是一个装饰器,它同时实现了Future
接口,所以它拥有对任务生命周期的控制。
Executor
执行的任务有4个生命周期阶段:创建、提交、开始和完成。有时一些任务可能要执行很长的时间,因此可能希望能够取消这些任务。在Executor
中,已提交但尚未开始的任务可以取消,但对于那些已经开始执行的任务,只有当他们能响应中断时(即任务主动检测中断请求,并在异常处理中结束自己),才能取消。
针对上面的问题,可以将渲染过程分为两个任务,一个是渲染所有的文本(cpu),另一个是下载所有的图像(I/O)。
1 | public class FutureRenderer { |
FutureRenderer
让渲染文本与下载图像并发执行,当渲染完文本后再获取下载的图像进行渲染,这样就省去了一部分等待时间。
但如果渲染文本的速度远远高于下载图像的速度,那么最终程序性能将与串行执行的性能相差不大。只有当大量相互独立且同构的任务可以并发处理时,才能体现出程序的工作负载分配到多个任务中带来的真正性能提升。
5.3. 改进:CompletionService实现
FutureRenderer
的并发度不高是因为它将所有图片的下载都放在了一个任务中,可以改成将每个图片的下载都独立成一个任务。但这样要考虑任务结果的获取问题,直接的办法是向Executor
提交一组下载任务,并保留每个与任务关联的Future
,然后反复调用get
方法,同时将timeout
指定为0,通过轮询来判断任务是否完成,但这显然很繁琐。
ExecutorCompletionService
相当于Executor
的装饰器。它持有Executor
和BlockingQueue
的实例,同时实现了一个私有的QueueingFuture
,对于提交的任务,它封装成QueueingFuture
然后委托给自己的Executor
去执行,并且在QueueingFuture
中改写了FutureTask
的done
实现,将结果放入自己的BlockingQueue
中。
- 这样就可以把任务结果的获取委托给
ExecutorCompletionService
1 | public class Renderer { |
6. 示例:旅行预订门户网站(限时任务)
通常用户输入旅行的日期和其他要求,门户网站获取并显示来自多条线路、旅店或汽车租赁公司的报价。在获取不同公司报价的过程中,可能会调用web服务、访问数据库、执行一个EDI失误或其他机制。在这种情况下,不宜让页面的响应时间受限于最慢的响应时间,而应该只显示在指定时间内收到的消息。对于没有及时响应的服务提供者,页面可以忽略它们。
可以创建n个任务,提交到一个线程池,保留n个Future,并使用限时的get方法通过Future串行地获取每一个结果,这一切都很简单,但还有一个更简单的方法:invokeAll()
1 | private class QuoteTask implements Callable<TravelQuote>{ |
invokeAll
接收一组任务,并按照任务集合中迭代器的顺序将所有的Future
添加到返回的集合中,从而使调用者能将各个Future
与其表示的Callable
关联起来。在提交后,如果超过指定时限,任何还未完成的任务都会被取消。当invokeAll()
返回后,每个任务要么是结束了,要么是被取消了。
7. 线程池的使用
.关于ThreadPoolExecutor
的源码分析,可以参考笔记:https://shanhm1991.github.io/2018/04/04/20180404/ 。
只有当任务都是同类型并且相互独立时,线程池的性能才能达到最佳。如果将运行时间较长的与运行时间较短的任务混合在一起,那么除非线程池很大,否则很可能造成拥塞;而如果提交的任务依赖于其他任务,那么除非线程池无线大,否则可能造成死锁,称为饥饿死锁。
- 饱和策略
一般ThreadPoolExecutor
会使用一个有界队列来保存已提交的任务,当有界队列被填满后,饱和策略开始发挥作用,默认的饱和策略是中止,并抛出一个未检查的RejectedExecutionException
,调用者可以捕获这个异常,然后根据需求编写自己的处理代码。当然,也可以通过RejectedExecutionHandler
来自定义拒绝处理。
调用者运行策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将任务回退到调用者,让调用者自己执行这个任务,从而降低新任务的流量。因为执行任务需要一定的时间, 因此调用线程至少在一段时间内不会提交任何任务,从而使得工作者线程有时间来消化已经提交的任务。
例如对于WebServer,当线程池中的所有线程都被占用,并且工作队列被填满后,下一个任务将在主线程中执行。在这期间,主线程不会调用accept,因此到达的请求将被保存在TCP层的队列中而不是在应用程序的队列中,如果持续过载,那么TCP层最终发现它的请求队列被填满,然后开始抛弃请求。这样当服务器过载时,这种过载会逐渐向外蔓延开—线程池工作队列到应用程序再到TCP层,最终到达客户端,导致服务器在高负载下实现一种平缓的性能降低。
- ThreadPoolExecutor扩展
ThreadPoolExecutor
是支持扩展的,它提供了一些方法给子类实现,比如beforeExecute
、afterExecute
、terminated
。terminated
一般用来释放Executor
在其生命周期里分配的各种资源,此外还可以执行发送通知、记录日志或者收集finalize统计信息等操作。
1 | public class TimingThreadPool extends ThreadPoolExecutor{ |
TimingThreadPool
中给出了一个自定义的线程池,通过beforeExecute
、afterExecute
、terminated
等方法来添加日志记录和统计信息收集。为了测量任务的运行时间,beforeExecute
必须记录开始时间并把它保存到一个afterExecute
可用访问的地方。因为这些方法将在执行任务的线程中调用,因此beforeExecute
可以把值保存到一个ThreadLocal
变量中,然后由afterExecute
来取。在TimingThreadPool
中使用了两个AtomicLong
变量,分别用于记录已处理的任务和总的处理时间,并在结束时在terminated
中计算任务的平均时间。
参考:
- Copyright ©《java并发编程实战》