《Java并发编程实战》 任务的执行

words: 5.1k    views:    time: 20min

任务通常是一些抽象且离散的工作单元,通过把程序中的工作分解到多个任务中,可以简化程序的组织结构,提供一种自然的事务边界来优化错误恢复过程,并提供一种自然的并行工作结构来提升并发性。

在正常的负载下,服务器应用程序应该同时表现出良好的吞吐量快速的响应性 。程序提供商希望程序支持尽可能多的用户,从而降低每个用户的服务成本,而用户则希望获得尽快的响应。当负荷过载时,程序的性能应该是逐渐降低,而不是直接失败。

理想情况下,可以找出清晰的任务边界,各个任务之间是相互独立的,即任务不依赖于其他任务的状态、结果或边界效应。独立性有助于并发,因为如果存在足够多的处理资源,那么这些独立的任务就可以并行执行,这也为调度与负载均衡等的实现提供了更好的灵活性。

1. 示例:处理请求

需求:将每个请求视为一个独立的任务,进行处理

1.1. 串行执行

SingleThreadWebServer
1
2
3
4
5
6
7
8
9
public class SingleThreadWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while(true){
Socket connection = socket.accept();
handleRequest(connection);
}
}
}

SingleThreadWebServer由于是单线程处理,在等待I/O操作完成时,CPU将处于空闲状态,因此服务器的资源利用率非常低。

通常,在web请求的处理中都会包含一组不同的运算与I/O操作,服务器必须处理套接字I/O以读取请求和写回响应,而这些操作通常会由于网络拥塞或联通性问题而被阻塞。

1.2. 改进:线程执行

ThreadPerTaskWebServer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ThreadPerTaskWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while(true){
final Socket connection = socket.accept();
Runnable task = new Runnable(){
public void run(){
handleRequest(connection);
}
};
new Thread(task).start();
}
}
}

ThreadPerTaskWebServer中以独立的请求为边界,创建线程进行处理。但线程的创建与销毁并非没有代价,并且活跃的线程会消耗系统资源。如果可运行的线程数多于可用处理器数,那么有些线程将会闲置,大量的空闲线程将会占用内存,给垃圾回收器带来压力。另外,大量线程在竞争CPU资源时还将产生额外的性能开销,如果已经有足够多的线程使所以cpu保持忙碌状态,那么更多的线程只会降低性能。

.一般如果任务是cpu密集型的不建议创建超过cpu数量的线程,而如果任务中的I/O操作比较多,则可以创建较多线程,因为I/O的读写速度远小于cpu的处理速度,cpu在等待I/O的时候其实是闲置的,因此创建更多的任务能提高cpu的利用率

2. Executor

Executor
1
2
3
public interface Executor{
void excute(Runnable command);
}

任务是一组逻辑工作单元,而线程则是使任务异步执行的机制。在java类库中,任务执行的主要抽象不是Thread,而是Executor。虽然Executor是个简单的接口,但它却为灵活且强大的异步任务执行框架提供了基础,它提供了一种标准的方法将任务的提交过程与执行过程解耦开来

Executor使用Runnable来表示任务,它可以支持多种不同类型的任务执行策略,另外还提供了对生命周期的支持,以及统计信息搜集、应用程序管理机制和性能检测等机制。通过使用Executor,可以实现各种调优、管理、监视、记录日志、错误报告和其他功能,如果不使用任务执行框架,那么要增加这些功能是非常困难的。但是,如果无法正确地关闭Executor,那么JVM将无法结束,因为JVM只有在所有非守护线程全部终止后才会退出

2.1. 改进:Executor执行

TaskExecutorWebServer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class TaskExecutorWebServer {   

private static final int NTHREADS = 100;

private static final ExecutorService exec = Executors.newFixedThreadPool(100);

public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while(true){
final Socket connection = socket.accept();
Runnable task = new Runnable(){
public void run(){
handleRequest(connection);
}
};
exec.execute(task);
}
}
}

TaskExecutorWebServer中通过Executor将任务的提交与任务的实际执行解耦开来,并且只需采用另一种不同的Executor实现,就可以改变服务的行为。比如,为每一个请求都创建一个新线程:

TaskExecutorWebServer
1
2
3
4
5
public class TaskExecutorWebServer implements Executor{    
public void excute(Runnable r){
new Thread(r).start();
};
}

或者类似于单线程的行为,以同步的方式执行每个任务:

TaskExecutorWebServer
1
2
3
4
5
public class TaskExecutorWebServer implements Executor{    
public void excute(Runnable r){
r.run();
};
}

3. ExecutorService

ExecutorService扩展了Executor接口,添加了一些管理生命周期的方法,以及一些用于任务提交的便利方法

ExecutorService
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public interface ExecutorService extends Executor {

// 平缓关闭:不再接受新的任务,同时等待已经提交的任务完成(包括那些还未开始执行的任务)
void shutdown();

// 强行关闭:尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务
List<Runnable> shutdownNow();

boolean isShutdown();

boolean isTerminated();

boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService的生命周期有3个状态:运行、关闭、终止,在初始化后处于运行状态。

所有任务都完成后,ExecutorService进入终止状态,可以调用awaitTermination来等待ExecutorService到达终止状态,或者通过调用isTerminated来轮询ExecutorService是否已经终止。通常awaitTerminationshutdown配合调用,以同步等待ExecutorService关闭。

ThreadPoolExecutor实现了ExecutorService,它维护了一个工作队列和一组工作线程worker,队列中保存了所有等待执行的任务,worker的任务很简单:即从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。

java类库中提供了一些创建线程池的静态工厂,但一般不建议使用,其实通过构造器可以根据实际场景加更灵活地创建

  • Executors.newFixedThreadPool:创建一个固定长度的线程池,每当提交一个任务就创建一个线程,直到达到线程池的最大数量,然后线程池的规模不再变化,如果某个线程由于Exception而意外结束,那么线程池会补充一个新的线程。

  • Executors.newCachedThreadPool:创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求,则回收空闲的线程,而当需求增加时,则添加新的线程,线程池的规模不存在任何限制。

  • Executors.newSingleThreadExecutor:是一个单线程的Executor,它创建单个工作线程来执行任务,如果这个线程异常结束,则创建一个新线程来替代,它可以确保任务按照队列中的顺序来串行执行。

  • Executors.newScheduledThreadPool:创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务。

3.1. 改进:解决Executor的关闭问题

LifecycleWebServer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class LifecycleWebServer{ 

private final ExecutorService exec = Executors.newFixedThreadPool(100);

public void start() throws IOException{
ServerSocket socket = new ServerSocket(80);
while(!exec.isShutdown()){
try{
final Socket conn = socket.accept();
exec.execute(new Runnable(){
public void run(){
handleRequest(conn);
}
});
}catch(RejectedExecutionException e){
if(!exec.isShutdown()){
log("task submission rejected",e);
}
}
}
}

public void stop(){
exec.shutdown();
}

void handleRequest(Socket connection){
Request req = readRequest(connection);
if(isShutdownRequest(req)){
stop();
}else{
dispatchRequest(req);
}
}
}

4. 定时任务

Timer负责管理延迟或周期任务,然而存在一些缺陷。Timer在执行所有定时任务时只会创建一个线程,若某个任务的执行时间过长,那么将破坏其他TimerTask的定时准确性,而且Timer是基于绝对时间而不是相对时间的调度机制,因此,任务的执行对系统的时钟变化很敏感。另外Timer线程并不捕获异常,如果TimerTask抛出未检查的异常,定时线程将终止,Timer也不会恢复线程,因此已经被调度但尚未执行的TimerTask将不会再执行,新的任务也不能被调度。

ScheduledThreadPoolExecutor可以解决这些问题,它只基于相对时间进行调度,而且如果调度线程异常终止,它会启动新的线程来代替。

ScheduledThreadPoolExecutor内部依赖一个私有实现的DelayedWorkQueue,来进行任务的调度。其实,通过DelayQueue也可以构建自己的调度服务。DelayQueue中管理着一组Delayed对象,每个Delayed对象有一个相应的延迟时间,只有某个元素逾期后,才能执行take操作,其返回的对象将根据延迟时间进行排序。

5. 示例:页面渲染

需求:渲染一个Html页面,假设:页面只包含标签文本,以及预定义大小的图片和URL。

5.1. 串行渲染

串行处理是最简单的方法,这很容易实现,程序只需将输入中的每个元素渲染一次。但这样用户体验很差,他们必须等待很长时间,直到所有渲染完成。

好一点的办法是先渲染文本,同时为图像预留出占位空间。在处理完文本后,再开始下载图像,并将它们渲染到对应的占位空间。

SingleThreadRender
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class SingleThreadRender {
void renderpage(CharSequence source){
renderText(source);

List<ImageData> imageData = new ArrayList<ImageData>();
for(ImageData imageInfo : scanForImageInfo(source)){
imageData.add(imageInfo.downloadImage());
}

for(ImageData data : imageData){
renderImage(data);
}
}
}

图像的下载过程大部分时间都是在等待I/O操作,在这期间CPU几乎是空闲的。因此,这种方式没有充分的利用CPU,使得用户在看到最终页面之前要等待很长时间。因此,可以将问题分解为多个独立的任务并发执行,从而获得更高的CPU利用率和灵敏度。

5.2. 改进:Future实现

Future表示一个任务的生命周期,其提供一些方法来判断任务是否已经完成或取消、获取任务的结果,或者取消任务。

Executor框架使用Runnable作为其基本的任务抽象,但Runnable有很大的局限性,虽然结果可以写到日志文件或者放入某个共享的数据结构,但它不能返回一个值或抛出一个受检查的异常。Callable是一种更好的抽象,它将返回一个值,并可以抛出异常。

.Executor会将RunnableCallable统一适配成FutureTask,其实,FutureTask就是一个适配器,它实现了Runnable接口并接收一个Callable实例。另外它也是一个装饰器,它同时实现了Future接口,所以它拥有对任务生命周期的控制

Executor执行的任务有4个生命周期阶段:创建、提交、开始和完成。有时一些任务可能要执行很长的时间,因此可能希望能够取消这些任务。在Executor中,已提交但尚未开始的任务可以取消,但对于那些已经开始执行的任务,只有当他们能响应中断时(即任务主动检测中断请求,并在异常处理中结束自己),才能取消。

针对上面的问题,可以将渲染过程分为两个任务,一个是渲染所有的文本(cpu),另一个是下载所有的图像(I/O)。

FutureRenderer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class FutureRenderer {

private final ExecutorService executor = Executors.newFixedThreadPool(1);

void renderPage(CharSequence source){
final List<ImageInfo> imageInfos = scanForImageInfo(source);

Callable<List<ImageData>> task = new Callable<List<ImageData>>(){
public List<ImageData> call(){
List<ImageData> result = new ArrayList<ImageData>();
for(ImageInfo imageInfo : imageInfos){
result.add(imageInfo.downloadImage);
}
return result;
}
}

Future<List<ImageData>> future = executor.submit(task);
renderText(source);

try{
List<ImageData> imageData = future.get();
for(ImageData data : imageData){
renderImage(data);
}
}catch(InterruptedException e){
Thread.currentThread().interrupt(); // 重新设置线程的中断状态
future.cancel(); // 由于不需要结果,因此取消任务
}catch(ExecutionException e){
throw launderThrowable(e.getCause());
}

}
}

FutureRenderer让渲染文本与下载图像并发执行,当渲染完文本后再获取下载的图像进行渲染,这样就省去了一部分等待时间。

但如果渲染文本的速度远远高于下载图像的速度,那么最终程序性能将与串行执行的性能相差不大。只有当大量相互独立且同构的任务可以并发处理时,才能体现出程序的工作负载分配到多个任务中带来的真正性能提升。

5.3. 改进:CompletionService实现

FutureRenderer的并发度不高是因为它将所有图片的下载都放在了一个任务中,可以改成将每个图片的下载都独立成一个任务。但这样要考虑任务结果的获取问题,直接的办法是向Executor提交一组下载任务,并保留每个与任务关联的Future,然后反复调用get方法,同时将timeout指定为0,通过轮询来判断任务是否完成,但这显然很繁琐。

ExecutorCompletionService相当于Executor的装饰器。它持有ExecutorBlockingQueue的实例,同时实现了一个私有的QueueingFuture,对于提交的任务,它封装成QueueingFuture然后委托给自己的Executor去执行,并且在QueueingFuture中改写了FutureTaskdone实现,将结果放入自己的BlockingQueue中。

  • 这样就可以把任务结果的获取委托给ExecutorCompletionService
Renderer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Renderer {

private final ExecutorService executor;

public Renderer(ExecutorService executor){
this.executor = executor;
}

public renderPage(CharSequence source){
List<ImageInfo> info = scanForImageInfo(source);
CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executor);
for(final ImageInfo imageInfo : info){
completionService.submit(new Callable<ImageData>(){
public ImageData call(){
return imageInfo.downloadImage();
}
});

renderText(source);

try{
for(int t=0, n=info.size(); t<n; t++){
Future<ImageData> f = completionService.take();
ImageData imageData = f.get();
renderImage(imageData);
}
}catch(InterruptedException e){
Thread.currentThread().interrupt();
}catch(ExecutionException e){
throw launderThrowable(e.getCause());
}
}
}
}

6. 示例:旅行预订门户网站(限时任务)

通常用户输入旅行的日期和其他要求,门户网站获取并显示来自多条线路、旅店或汽车租赁公司的报价。在获取不同公司报价的过程中,可能会调用web服务、访问数据库、执行一个EDI失误或其他机制。在这种情况下,不宜让页面的响应时间受限于最慢的响应时间,而应该只显示在指定时间内收到的消息。对于没有及时响应的服务提供者,页面可以忽略它们。

可以创建n个任务,提交到一个线程池,保留n个Future,并使用限时的get方法通过Future串行地获取每一个结果,这一切都很简单,但还有一个更简单的方法:invokeAll()

QuoteTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private class QuoteTask implements Callable<TravelQuote>{

private final TravelCompany company;

private final TravelInfo travelInfo;

//...

public TravelQuote call() throws Exception{
return company.solicitQuote(travelInfo);
}
}

public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo, Set<TravelCompany> companies,
Comparator<TravelQuote> ranking, long time,TimeUnit unit) throws InterruptedException{

List<QuoteTask> tasks = new ArrayList<QuoteTask>();
for(TravelCompany company : companies){
tasks.add(new QuoteTask(company, travelInfo));
}
List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit);

List<TravelQuote> quotes = new ArrayList<TravelQuote>(tasks.size());
Iterator<QuoteTask> taskIte = tasks.iterator();
for(Future<TravelQuote> f : futures){
QuoteTask task = taskIte.next();
try{
quotes.add(f.get());
}catch(ExecutionException e){
quotes.add(task.getFailureQuote(e.getCause()));
}catch(CancellationException e){
quotes.add(task.getTimeoutQuote(e));
}
}
return quotes;
}

invokeAll接收一组任务,并按照任务集合中迭代器的顺序将所有的Future添加到返回的集合中,从而使调用者能将各个Future与其表示的Callable关联起来。在提交后,如果超过指定时限,任何还未完成的任务都会被取消。当invokeAll()返回后,每个任务要么是结束了,要么是被取消了。

7. 线程池的使用

.关于ThreadPoolExecutor的源码分析,可以参考笔记:https://shanhm1991.github.io/2018/04/04/20180404/

只有当任务都是同类型并且相互独立时,线程池的性能才能达到最佳。如果将运行时间较长的与运行时间较短的任务混合在一起,那么除非线程池很大,否则很可能造成拥塞;而如果提交的任务依赖于其他任务,那么除非线程池无线大,否则可能造成死锁,称为饥饿死锁。

  • 饱和策略

一般ThreadPoolExecutor会使用一个有界队列来保存已提交的任务,当有界队列被填满后,饱和策略开始发挥作用,默认的饱和策略是中止,并抛出一个未检查的RejectedExecutionException,调用者可以捕获这个异常,然后根据需求编写自己的处理代码。当然,也可以通过RejectedExecutionHandler来自定义拒绝处理。

调用者运行策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将任务回退到调用者,让调用者自己执行这个任务,从而降低新任务的流量。因为执行任务需要一定的时间, 因此调用线程至少在一段时间内不会提交任何任务,从而使得工作者线程有时间来消化已经提交的任务。

例如对于WebServer,当线程池中的所有线程都被占用,并且工作队列被填满后,下一个任务将在主线程中执行。在这期间,主线程不会调用accept,因此到达的请求将被保存在TCP层的队列中而不是在应用程序的队列中,如果持续过载,那么TCP层最终发现它的请求队列被填满,然后开始抛弃请求。这样当服务器过载时,这种过载会逐渐向外蔓延开—线程池工作队列到应用程序再到TCP层,最终到达客户端,导致服务器在高负载下实现一种平缓的性能降低。

  • ThreadPoolExecutor扩展

ThreadPoolExecutor是支持扩展的,它提供了一些方法给子类实现,比如beforeExecuteafterExecuteterminated
terminated一般用来释放Executor在其生命周期里分配的各种资源,此外还可以执行发送通知、记录日志或者收集finalize统计信息等操作。

TimingThreadPool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class TimingThreadPool extends ThreadPoolExecutor{

public TimingThreadPool(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
private final Logger log = Logger.getLogger("TimingThreadPool");
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();

protected void beforeExecute(Thread t,Runnable r){
super.beforeExecute(t, r);
log.fine(String.format("Thread %s: start %s", t, r));
startTime.set(System.nanoTime());
}

protected void afterExecute(Throwable t,Runnable r){
try{
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime));
}finally{
super.afterExecute(r, t);
}
}

protected void terminated(){
try{
log.info(String.format("Terminated: avg time=%dns", totalTime.get() / numTasks.get()));
}finally{
super.terminated();
}
}
}

TimingThreadPool中给出了一个自定义的线程池,通过beforeExecuteafterExecuteterminated等方法来添加日志记录和统计信息收集。为了测量任务的运行时间,beforeExecute必须记录开始时间并把它保存到一个afterExecute可用访问的地方。因为这些方法将在执行任务的线程中调用,因此beforeExecute可以把值保存到一个ThreadLocal变量中,然后由afterExecute来取。在TimingThreadPool中使用了两个AtomicLong变量,分别用于记录已处理的任务和总的处理时间,并在结束时在terminated中计算任务的平均时间。


参考:

  1. Copyright ©《java并发编程实战》