说些什么呢

关于并发(WIP)

· Jahjah

为什么要使用线程?

希望多个任务可以一起做,希望耗时长的任务不阻塞我的主线程(关于什么是线程可以看这篇文章),既然有这样的需求,肯定也有对应的解决方案。 比如可以通过创建多个线程来并发执行多个任务

new Thread(() -> {
    doFirstTask();    
}).start()

new Thread(() -> {
    doSecondTask();    
}).start()

可以通过创建线程来异步执行耗时长的任务

Integer result = doSomething();

new Thread(() -> {
    doTask();    
}).start()

return result;

通过上述的两种方法满足了需求,但是也存在一些问题,比如

  • 如果我有一万个任务,难道要创建一万个线程去做吗?
  • 如果创建线程的代码藏身于巨长无比的业务逻辑中,然后被别的代码反复调用怎么办?
  • 如果最多创建10个线程,那当所有线程都在工作,新任务来了怎么办?
  • 系统创建了很多线程,要如何对创建的线程做监控,知道他们各自的状态?

所以,线程作为一种重要的资源,需要对其进行管理

线程池

当有对线程进行管理的需求时,使用线程池将会是一种不错的解决方案。只需要配置几个参数,就可以很轻松的满足上述需求。


ThreadPoolExecutor executor = new ThreadPoolExecutor(
    2,                       // 核心线程数
    4,                       // 最大线程数
    60,                      // 空闲线程的存活时间
    TimeUnit.SECONDS,        // 存活时间单位
    new ArrayBlockingQueue<>(10) // 任务队列
);

在创建线程池的时候可以根据硬件(CPU核心数),业务的类型(CPU密集/IO密集)通过corePoolSizemaximumPoolSize对线程的数量控制,同时通过keepAliveTimetimeUnit实现线程数量的动态调节,这样既可以避免线程资源的浪费,也不会频繁的创建销毁线程。 同时还提供了类似于getPoolSize()getActiveCount()getCompletedTaskCount()等方法查看线程池的运行状态,还有beforeExecute(),**afterExsecute()**添加额外的逻辑方便进行线程池的监控,通过长期的监控来配置线程池的线程数也更为合理

线程池很好的解决了线程管理的问题,但现实中的任务之间往往没有很好的并发性,如果任务之间相互依赖怎么办,比如

  • 做完taksA再去做taskB
  • taskB依赖taskA的执行结果
  • taskA,taskB都执行完了再去做taskC
  • taskA,taskB中有一个完成了再去做taskC

Complateable future

当我们有多个并发任务可组合的需求时,Complateable future会是一个很好的解决方案

做完taskA再去做taskB

CompletableFuture.runAsync(() -> {
    doTaskA();
}).thenRun(() -> {
    doTaskB();
});

taskA依赖taskB执行结果

CompletableFuture.supplyAsync(() -> {
    return doTaskA();
}).thenAccept(taskAResult -> {
    doTaskB(taskAResult);
});

taskA,taskB都执行完了再去做taskC

CompletableFuture<Void> taskC = CompletableFuture.allOf(taskA, taskB)
                .thenRun(() -> {
                    doTaskC();
                });

taskA,taskB中有一个完成了再去做taskC

CompletableFuture<Void> taskC = CompletableFuture.anyof(taskA, taskB)
                .thenRun(() -> {
                    doTaskC();
                });

通过这种链式的写法很好的把多个任务组合了起来,无论是可读性,还是维护性都是非常好的。而且还提供异常处理,让代码更健壮

CompletableFuture.supplyAsync(() -> {
    if (true) throw new RuntimeException("Something went wrong!");
    return "Success";
}).exceptionally(ex -> {
    System.out.println("Exception: " + ex.getMessage());
    return "Fallback Result";
}).thenAccept(System.out::println);