Java并发编程实战②-并发工具类
[TOC]
Java并发编程实战-并发工具类
Java并发编程实战是学习《并发编程实战》和极客时间《Java并发编程》的记录。
并发工具类-总览
第二部分:并发工具类
- Lock和Condition(上):隐藏在并发包中的管程
- Lock和Condition(下):Dubbo如何用管程实现异步转同步?
- Semaphore:如何快速实现一个限流器?
- ReadWriteLock:如何快速一个完备的缓存?
- StampedLock:有没有比读写锁更快的锁?
- CountDownLatch和CyclicBarrier:如何让多线程步调一致?
- 并发容器:都有哪些”坑”需要我们填?
- 原子类:无锁工具类的典范
- Executor与线程池:如何创建正确的线程池?
- Feture:如何多线程实现最优的”烧水泡茶”程序?
- CompletableFuture:异步编程没有那么难。
- CompletionService:如何批量执行异步任务?
- Fork/Join:单机版的MapReduce
Lock和Condition(上):隐藏在并发包中的管程
Java SDK并发包内容很丰富,但是最核心的还是对管程的实现。
Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥 问题,Condition 用于解决同步问题。
Synchronized是默认对管程的实现。
并发编程的两大核心问题:
- 互斥:即同一时刻只允许一个线程 访问共享资源;
- 同步:即线程之间如何通信、协作。
Lock再造管程的理由:Synchronized升级版
- synchronized已经实现了管程了,为什么还需要重复造轮子呢?
- synchronized 申请资源的时候,如果申请不到,线程直 接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。
- 我们从解决死锁问题出发,破坏”不可抢占条件”。使用synchronized是无法实现的,这里就引出了,我们如何设计一个新的互斥锁的问题。可以有三种方案和Lock的具体实现
- 能够响应中断:
void lockInterruptibly() throws InterruptedException;
- 支持超时:
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
- 非阻塞地获取锁:
boolean tryLock();
- 能够响应中断:
- Lock如何保证可见性
- Lock的使用有一个经典的范式:
try{}finally{}
- java多线程的可见性是通过happens-before规则保证的。
- Java SDK 里面锁可见性:它是利用了 volatile 相关的 Happens-Before 规则
- 顺序性规则
- volatile变量规则
- 传递性规则
- Lock的使用有一个经典的范式:
什么是可重入锁(ReentrantLock
)
ReentrantLock
可重入锁:指的是线程可以重复获取通一把锁。可重入函数:指的是多个线程可以同时调用该函数。线程安全
公平锁和非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}Java构造公平锁和非公平锁是通过重入锁的构造函数实现的。
公平锁和非公平锁的应用场景:
- 锁都对应着一个等待 队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队 列中唤醒一个等待的线程。
- 公平锁:唤醒的策略就是谁等待的时间长,就唤醒谁。
- 非公平锁:自己定义的策略。
用锁的最佳实践
- 永远只在更新对象的成员变量时加锁
- 永远只在访问可变的成员变量时加锁
- 永远不在调用其他对象的方法时加锁
并发问题,本来就难以诊断,所以你一定要让你的代码尽量安全,尽量简单,哪怕有一点可能会 出问题,都要努力避免。
Lock和Condition(下):Dubbo如何用管程实现异步转同步?
Condition 用于解决同步问题,Condition 实现了管程模型里面的条件变量。
Java 语言内置(
Synchronized
)的管程里只有一个条件变 量,而 Lock&Condition 实现的管程是支持多个条件变量的,这是二者的一个重要区别。谈谈
Synchronize
和Lock&Condition
实现管程API(”等待-通知”)的区别:
- Lock&Condition的API:await()、 signal()、signalAll()
- Synchronize的API:wait()、notify()、notifyAll()
同步与异步
什么是异步:通俗点来讲就是调用方是否需要等待结果,需要等待结果,就是同步。 不等待结果,就是异步。
异步调用:调用方创建一个子线程,在子线程中执行方法调用。
异步方法:方法实现的时候,创建一个新的线程执行主要逻辑,主线程直接 return。
Dubbo异步转同步
由于TCP协议本身就是异步的,Dubbo的底层协议就是TCP协议,我们经常要发起RPC调用。在TCP协议层面,发送完RPC请求后,线程是不会等待RPC的响应结果的。但现实我们确实同步调用的。
而且Dubbo源码:
DefaultFuture.get()
方法实现的等待-通知机制。
Semaphore:如何快速实现一个限流器?
Semaphore信息量,是并发编程领域的终结者。几乎所有支持并发编程的语言都支持信号量机制。
Semaphore信号量模型
信号量模型的组成:一个计数器,一个等待队列,三个方法。在信号量 模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它 们,这三个方法分别是:init()、down() 和 up()。
init():设置计数器的初始化值
down()==P操作:semWait()==
acquire()
,计数器的值减 1;如果此时计数器的值小于 0,则当前线程将被阻塞,否则当前线程 可以继续执行。up()==V操作:semSignal()==
release()
,计数器的值加 1;如果此时计数器的值小于或者等于 0,则唤醒等待队列中的一个线 程,并将其从等待队列中移除。
package concurrent._16;
import java.util.Queue;
/**
* 描述:
* 信号量本质
*
* @author Noah
* @create 2019-10-12 08:58
*/
public class _16_Semaphore {
/**
* 计数器
*/
int count;
/**
* 等待队列
*/
Queue queue;
/**
* 初始化计数器
*
* @param count
*/
public _16_Semaphore(int count) {
this.count = count;
}
/**
* 计数器的值减 1;如果此时计数器的值小于 0,
* <p>
* 则当前线程将被阻塞,否则当前线程 可以继续执行。
*/
public void down() {
count--;
if (count < 0) {
//当前线程加入等待队列
//阻塞当前线程
}
}
/**
* 计数器的值加 1;如果此时计数器的值小于或者等于 0,
* <p>
* 则唤醒等待队列中的一个线 程,并将其从等待队列中移除。
*/
public void up() {
count++;
if (count <= 0) {
//移除等待队列
//唤醒等待线程
}
}
}
快速实现一个限流器(Semaphore)
- Semaphore最强大的地方:Semaphore可以允许多个线程访问一个临界区
- 栗子:其中,你可能最熟悉数据库连接池,在同一时刻,一定是允许多个线 程同时使用连接池的,当然,每个连接在被释放前,是不允许其他线程使用的。
ReadWriteLock:如何快速一个完备的缓存?
使用管程和信号量这两个同步原语可以解决所有的并发问题。但是JDK并发包为什么还提供了很多工具类呢?
分场景优化性能,提升易用性。
什么是读写锁(ReadWriteLock)
- 读写锁的三条基本原则
- 允许多个线程同时读共享变量
- 只允许一个线程写共享变量
- 如果一个写线程正在执行写操作,此时禁止读线程读共享变量。
- 读写锁与互斥锁的一个重要区别就是读写锁允许多个线程同时读共享变量,读写锁的写操作是互斥的
- 因此读写锁非常适合在缓存这种场景使用,读多写少的并发场景。
快速实现一个缓存
两种实现缓存的方案:
- 初始化的时候,一次性全部数据载入缓存。适合数据量不大的场景。
- 按需加载缓存:查找数据不在缓存,再查询数据加入到缓存中。
```java
/**描述:
使用读写锁来实现缓存
@author Noah
@create 2019-10-13 20:55
*/
public class _17_ReadWriteLock_Cache<K, V> {/**
- 存储数据的载体:非线程安全
*/
final Map<K, V> data = new HashMap<>();
/**
- 可重入读写锁
*/
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
/**
- 读锁
*/
final Lock rl = rwl.readLock();
/**
- 写锁
*/
final Lock wl = rwl.writeLock();
/**
读缓存操作:
按需加载
@param key
@return
*/
public V getValue(K key) {V val = null;
rl.lock();
try {
val = data.get(key);
} finally {
rl.unlock();
}if (val != null) {
return val;
}wl.lock();
try {
//再次校验:可能其他线程已经获取到了数据
val = data.get(key);
if (val == null) {
//查询数据写入缓存场景
data.put(key, val);
}
} finally {
wl.unlock();
}
return val;
}
/**
- 写缓存操作
- @param k
- @param v
*/
public void writeValue(K k, V v) {
wl.lock();
try {
data.put(k, v);
} finally {
wl.unlock();
}
}
- 存储数据的载体:非线程安全
}
### 读写锁的升级与降级
- `ReentrantReadWriteLock`先总结:读锁升级为写锁是不能的,但是写锁降级为读锁是可以的。
- ```java
/**
* 读锁升级为写锁:不能实现
* <p>
* 写锁降级为读锁:可以实现
*
* @param key
* @return
*/
public V get(K key) {
rl.lock();
V r = null;
try {
r = data.get(key);
} finally {
rl.unlock();
}
if (r == null) {
try {
wl.lock();
if ((r = data.get(key)) != null) {
return r;
}
//让缓存中写入数据
data.put(key, r);
rl.lock();
} finally {
wl.unlock();
}
}
try {
//读锁仍然持有
data.size();
} finally {
rl.unlock();
}
return r;
}
读写锁总结
- 读写锁跟
ReentrantLock
类似,都是实现了Lock。支持公平模式和非公平模式。 - 只有写锁支持条件变量, 读锁是不支持条件变量的。
StampedLock:有没有比读写锁更快的锁?
读多写少的并发场景,使用ReentranstReadWriteLock性能已经很好了,但是JDK1.8提供了一个叫StampedLock的锁。
ReentrantReadWriteLock和StampedLock的比较
ReadWriteLock:只支持两种模式,读锁和写锁。
StampedLock:支持三种模式,写锁、悲观读锁和乐观读(无锁)。前面二者和ReadWriteLock类似
在ReadWriteLock的读写锁,在有线程在读操作的时候,写操作会被阻塞的。但是在StampedLcok下,是乐观读的时候,是允许一个线程执行写操作。
```java
/**
* 乐观读栗子
*/
public void optimisticRead() {
long stamp = stampedLock.tryOptimisticRead();
//乐观读是无锁的,把成员变量读入到局部变量,非线程安全的。
int curX = x;
int curY = y;
//校验:在执行读的过程是否有写操作,有的话就升级为悲观读
if (!stampedLock.validate(stamp)) {
stamp = stampedLock.readLock();
try {
curX = x;
curY = y;
} finally {
stampedLock.unlockRead(stamp);
}
}
}
|
- stamp和数据库的version是同样作用的。
StampedLock注意事项
- StampedLcok的功能仅仅是ReadWriteLock的子集
- StampedLcok是不支持重入的。
- StampedLock 的悲观读锁、写锁都不支持条件变量。
- 使用 StampedLock 一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的 悲观读锁 readLockInterruptibly() 和写锁 writeLockInterruptibly()。不然cpu会100%
CountDownLatch和CyclicBarrier:如何让多线程步调一致?
对账系统的优化:单线程—>多线程—>线程池—>CountDownLatch(实现主线程等待)
—>考虑使用双队列(但是要考虑到步调)—>CyclicBarrier(同步优化)
对账系统的最终实现流程:采用双队列,队列1:查询订单,队列2:派单队列。有未对账的订单,调用对账逻辑,
CyclicBarrier协调两个队列的步调。
介绍CountDownLactch和CyclicBarrier
java.util.concurrent.CountDownLatch
和java.util.concurrent.CyclicBarrier
的比较:- 同:JDK并发包提供两个易用的线程同步工具类
- 异:CountDownLatch 主要用来解决一个线程等待 多个线程的场景 。CountDownLatch 的计数器是不能循环利用的
- 异:CyclicBarrier 是一组线程之间互相等待 ,但CyclicBarrier 的计数器是可以循环利用的 。
并发容器:都有哪些”坑”需要我们填?
并发容器和同步容器是两个概念。容器4大类型: List、Map、Set 和 Queue 。并非所有容器都是线程安全。
常见同步容器:经过Collections包装后的线程安全的容器(synchronized实现),还有vector、stack、hashtable。
并发容器:性能更好。解决同步容器使用synchronized来保证互斥,串行度太高了。
- 如何将非线程安全的容器变成线程安全的容器?
- 只要把非线程安全的容器封装在对象内部,然后控制好访问路径就可以了。 (同管程思想)
同步容器
- 同步容器注意的地方:
- 组合操作注意竞态条件问题:即使每个操作都能保证原子性,但是组合操作无法保证原子性。
- 使用迭代器遍历线程安全容器:要确保线程安全,需要我们使用synchronized(集合)再遍历容器。
并发容器
并发容器注意的地方:
常见并发容器
java.util.concurrent.CopyOnWriteArrayList
- 特点:内部维护一个数组,在遍历(读)容器是在原内部数组进行,写操作的copy一个新的数组。
- 场景:写操作非常少的场景,而且能够容忍读写的短暂不一致 。
- 注意”坑”:
- CopyOnWriteArrayList 迭代器是只读的,不支持增删改。
Map
ConcurrentHashMap
- key是无序的
- key和value都不能为空。
ConcurrentSkipListMap
- key是有序的
- key和value都不能为空。
- 基于SkipList(跳表)实现,平均时间复杂度为O(log n))
- Map比较示意图
Set
- CopyOnWriteArraySet
- ConcurrentSkipListSet
Queue
- 不同维度分类
- 阻塞( Blocking )和非阻塞
- 单端( Queue )和双端(Deque ):单端指的是只能队尾入队,队首出队;而双端指的是队首队尾皆可入 队出队。
- 两个维度组合之后的Queue
- 单端阻塞队列
- ArrayBlockingQueue(数组)、LinkedBlockingQueue(队列)、 SynchronousQueue(无队列)、LinkedTransferQueue(2+3的升级版)、PriorityBlockingQueue(优先级队列) 和 DelayQueue (延时队列)
- 双端阻塞队列
- LinkedBlockingDeque
- 单端非阻塞队列
- ConcurrentLinkedQueue
- 双端非阻塞队列
- ConcurrentLinkedDeque
- 只有 ArrayBlockingQueue 和 LinkedBlockingQueue 是支持有界 的,所以在使用其他无界队列时,一定要充分考虑是否存在导致 OOM 的隐患
- 单端阻塞队列
- 不同维度分类
原子类:无锁工具类的典范
原子性问题:我们之前都是使用互斥锁来解决的,我们可以考虑使用无锁方案来解决,其性能更好。
原子类本质(CAS)
原子类性能高的原因:
- 硬件支持:CPU为了解决并发问题,提供了CAS指令。
什么是CAS指令?
- CAS:Compare and swap,比较并交换。
- CAS指令包含三个参数:共享变量的内存地址A,用于比较的值B和共享变量的新值C。
- 只有当内存中地址 A 处的值等于 B 时,才能将内存中地址 A 处的值更新为新值 C 。
- 作为一条 CPU 指令,CAS 指令本身是能够保证原 子性的。
CAS+自旋 :循环尝试
```java
volatile int count;
/**
* cas+自旋
/
void addOne() {
int newVal;
do {
newVal = count + 1;
} while (count != cas(count, newVal));
}
/*
* 模拟cas指令
*
* @param expect
* @param newVal
* @return
*/
public int cas(int expect, int newVal) {
int curVal = count;
if (curVal == expect) {
count = newVal;
}
//返回写入前的值
return curVal;
}
- CAS需要注意ABA问题
1. 解决方案:参考乐观锁机制,具体:更新的时候增加一个版本号。
### Java原子类如何实现
>- jdk源码实现CAS
>
>
>```java
>public final int getAndAddInt(Object var1, long var2, int var4) {
> int var5;
> do {
> var5 = this.getIntVolatile(var1, var2);
> } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
>
> return var5;
>}
>
- CAS经典规范
>do {
>// 获取当前值
>oldV = xxxx;
>// 根据当前值计算新值 newV = ...oldV...
>}while(!compareAndSet(oldV,newV);
JDK原子类概览
- jdk原子化的基本数据 类型、原子化的对象引用类型、原子化数组、原子化对象属性更新器和原子化的累加器 。
- AtomicStampedReference 和 AtomicMarkableReference 这两个原子类可以解决 ABA 问题 。乐观锁机制。
总结
Java 提供的原子类能够解决一些简单的原子性问题,但你可能会发现,上面我们所有原子类的方 法都是针对一个共享变量的,如果你需要解决多个变量的原子性问题,建议还是使用互斥锁方 案。原子类虽好,但使用要慎之又慎。
jdk原子类都提供了compareAndSet()方法,但是建议不好自己去实现无锁算法。
Executor与线程池:如何创建正确的线程池?
创建线程是一个重量级的对象,应该避免频繁创建和销毁。
所以使用线程池。线程池跟普通的池化资源不同,不是acquire(),之后就release()。
而是采用生产者—消费者模式。
线程池也是一种生产者——消费者模式
- 自定义简单线程池
/**
* 阻塞队列
*/
private BlockingQueue<Runnable> blockingQueue;
/**
* 工作线程
*/
private List<NoahThreadPool> threadPools = new ArrayList<>();
/**
* 构造线程池
*
* @param queue
* @param poolSize
*/
public _22_NoahThreadPool(BlockingQueue<Runnable> queue, int poolSize) {
this.blockingQueue = queue;
for (; poolSize > 0; poolSize--) {
NoahThreadPool thread = new NoahThreadPool();
thread.start();
threadPools.add(thread);
}
}
/**
* 暴露外部任务接口
*
* @param runnable
*/
public void execute(Runnable runnable) {
blockingQueue.add(runnable);
}
class NoahThreadPool extends Thread {
public void run() {
while (true) {
try {
Runnable task = blockingQueue.take();
task.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- jdk自定义线程池通过:ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数解析
- corePoolSize :表示线程池保有的最小线程数
- maximumPoolSize :表示线程池创建的最大线程数
- keepAliveTime & unit :表示”一段时间”线程的空闲状态。
- workQueue :工作队列
- threadFactory :通过这个参数你可以自定义如何创建线程,例如你可以给线程指定一个有意义的名字。
- handler :拒绝策略(当maximumpoolsize都在工作&&queue也满了&&queue是有界队列)
- CallerRunsPolicy :提交任务的线程自己去执行任务
- AbortPolicy :默认拒绝策略,会抛出RejectedExecutionException
- DiscardPolicy :直接丢弃任务,没有任何异常抛出。
- DiscardOldestPolicy :丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入 到工作队列。
注意事项:
强烈建议使用有界队列
默认拒绝策略要慎重使用
异常处理 :发生了异常却收不到通知,误认为正常运行
- ```
try { //业务逻辑
} catch (RuntimeException x) { //按需处理
} catch (Throwable x) { //按需处理
}
## Feture:如何多线程实现最优的"烧水泡茶"程序?
ThreadPoolExecutor线程池获取执行结果。在`public void execute(Runnable command)`无法获得执行结果。
但是在该类中提供了三个submit()方法和一个FutureTask工具类来获取任务执行的结果。
- submit()方法的三个参数
- ```java
//有返回值
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(FutureTask<T> futureTask);
//无返回值
Future<?> submit(Runnable task);
Future接口的五个方法
// 取消任务 boolean cancel(boolean mayInterruptIfRunning); // 判断任务是否已取消 boolean isCancelled(); // 判断任务是否已结束 boolean isDone(); // 获得任务执行结果 get(); // 获得任务执行结果,支持超时 get(long timeout, TimeUnit unit);
### 总结
利用Java并发包提供的Future可以很容易获得异步任务的执行结果,无论异步任务是通过线程池 ThreadPoolExecutor执行的,还是通过手工创建子线程来执行的。
## CompletableFeture:异步编程没有那么难。
> 异步化:是并行方案得以实施的基础,更深入地讲其实就是:利用多线程优化性能这个核心方案得以实施的 基础。
>
> JDK1.8提供了`java.util.concurrent.CompletableFuture`来支持异步编程。
### CompletableFuture的优势
1. 无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;
2. 代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。
3. 精确描述上下文关系
### CompletableFuture对象创建
```java
//Runnable.run()没有返回值。
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
//Supplier.get()有返回值。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)还有一个区别可以手动指定线程池参数。
默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是CPU的 核数(也可以通过JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism来设置ForkJoinPool 线程池的线程数)。
建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。
对于 一个异步操作,你需要关注两个问题:一个是异步操作什么时候结束,另一个是如何获取异步操作的执行结果
java.util.concurrent.CompletionStage接口理解
任务之间是有时序关系的,比如:串行关系,并行关系,汇聚关系(AND、OR)等。
CompletionStage接口可以清晰地描述任务之间的这种时序关系 。
异常处理也是必须要处理情况。
串行关系
- CompletionStage接口里面描述串行关系,主要是thenApply、thenAccept、thenRun和thenCompose这四 个系列的接口。
- 这些方法里面Async代表的是异步执行fn、consumer或者action。其中,需要你注意的是thenCompose系列 方法,这个系列的方法会新创建出一个子流程,最终结果和thenApply系列是相同的。
汇聚AND关系
- CompletionStage接口里面描述AND汇聚关系,主要是thenCombine、thenAcceptBoth和runAfterBoth系列
汇聚OR关系
- CompletionStage接口里面描述OR汇聚关系,主要是applyToEither、acceptEither和runAfterEither系列
异常处理
fn、consumer、action它们的核心方法都不允许抛出可检查异常,但是却无法限制它 们抛出运行时异常.
在非异步编程:我们可以使用try{}catch{}是捕获异常。
在异步编程:CompletionStage接口提供了相关的方法(链式编程):
```java
//exceptionally()的使用非常类似于 try{}catch{}中的catch{},
CompletionStage exceptionally(fn);
//whenComplete()和handle()系列方法就类似于try{}finally{}中的finally{}
//whenComplete()和 handle()的区别在于whenComplete()不支持返回结果,而handle()是支持返回结果的。
CompletionStagewhenComplete(consumer);
CompletionStagewhenCompleteAsync(consumer);
CompletionStagehandle(fn);
CompletionStagehandleAsync(fn);
### 总结
> 异步编程流行起来的原因,是因为回调地狱(Callback Hell)成功解决。
## CompletionService:如何批量执行异步任务?
> `ThreadPoolExecutor+Future `+`CompletableFuture`+`CompletionService`:分工领域
>
> 细观察你会发现这 些工具类都是在帮助我们站在任务的视角来解决并发问题,而不是让我们纠缠在线程之间如何协作的细节上 (比如线程之间如何实现等待、通知等)。对于简单的并行任务,你可以通过“线程池+Future”的方案来 解决;如果任务之间有聚合关系,无论是AND聚合还是OR聚合,都可以通过CompletableFuture来解决;而 批量的并行任务,则可以通过CompletionService来解决。
### 创建CompletionService对象
- ExecutorCompletionService是CompletionService接口的实现类。
- 接口的五个方法,你能够理解的。
```java
public ExecutorCompletionService(Executor executor)
public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue)
利用CompletionService实现Dubbo中的Forking Cluster
如何理解Forking cluster:支持并行地调用多个查询服务,只要有一个成 功返回结果,整个服务就可以返回了。
总结
当需要批量提交异步任务的时候建议你使用CompletionService。CompletionService将线程池Executor和阻 塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。除此之外, CompletionService能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可 以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如Forking Cluster这样的需求。
CompletionService的实现类ExecutorCompletionService,需要你自己创建线程池,虽看上去有些啰嗦,但 好处是你可以让多个ExecutorCompletionService的线程池隔离,这种隔离性能避免几个特别耗时的任务拖 垮整个应用的风险。
Fork/Join:单机版的MapReduce
前面我们讨论的三种任务模型,并行,聚合,批量并行这三种任务模型。在任务的层次来看(
ThreadPoolExecutor+Future
+CompletableFuture
+CompletionService
):都是分工的事情(分工、同步、互斥)。但是分治这种任务模型我们还没解决:Java并发包里提供了一种叫做Fork/Join的并行计算框架,就 是用来支持分治这种任务模型的。
分治:把一个复杂的问题分解成多个相似的子问题,然后再把子问题分 解成更小的子问题,直到子问题简单到可以直接求解。
分治任务模型
分治任务分为两个阶段:任务分解和结果合并
所以:分治任务一般都是采用递归算法
Fork/Join的使用
Fork/Join是一个并行计算的框架,主要就是用来支持分治任务模型的,这个计算框架里的Fork对应的是分治 任务模型里的任务分解,Join对应的是结果合并。Fork/Join计算框架主要包含两部分,一部分是分治任务的 线程池ForkJoinPool,另一部分是分治任务ForkJoinTask。这两部分的关系类似于ThreadPoolExecutor和 Runnable的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型ForkJoinTask .
java.util.concurrent.ForkJoinTask抽象类
- 最核心的是fork()方法(异步地执行一个任务)和join()方法 (阻塞当前线程获取执行结果)
- 有两个子类(抽象类)—— RecursiveAction(compute ()方法没有返回值)和RecursiveTask (compute ()方法有返回值)
- ForkJoinPool是核心组件,也是生产者-消费者模式。更加高级的是当有空闲的线程,会任务窃取忙于的线程(双端队列)
斐波那契数列:Fork/Join实现
```java
package concurrent._26;import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;/**
描述:
使用Fork/join分治框架实现斐波那契数列
@author Noah
@create 2019-10-22 09:11
*/
public class _26_forkJoin_Fibonacci {
public static void main(String[] args) {ForkJoinPool forkJoinPool = new ForkJoinPool(4); Fibonacci fib = new Fibonacci(4); Integer r = forkJoinPool.invoke(fib); System.out.println("r=" + r);
}
}
/**
分治递归任务
*/
class Fibonacci extends RecursiveTask{ final int n;
public Fibonacci(int n) {
this.n = n;
}@Override
protected Integer compute() {if (n <= 1) { return n; } Fibonacci f1 = new Fibonacci(n - 1); f1.fork(); Fibonacci f2 = new Fibonacci(n - 2); return f2.compute() + f1.join();
}
}
- 统计千万单词的出现次数
- ```java
package concurrent._26;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
* 描述:
* <p>
* MapReduce
* 使用使用fork/join框架计算1亿单词的出现次数
*
* @author Noah
* @create 2019-10-23 09:17
*/
public class _26_CountBillionWord {
public static void main(String[] args) {
String[] fc = {"hello world",
"hello me",
"hello fork",
"hello join",
"fork join in world"};
//创建ForkJoin线程池
ForkJoinPool fjp = new ForkJoinPool(3);
//创建任务
MR mr = new MR(fc, 0, fc.length);
//启动任务
Map<String, Long> result = fjp.invoke(mr); //输出结果
result.forEach((k, v) -> System.out.println(k + ":" + v));
}
}
//MR模拟类
class MR extends RecursiveTask<Map<String, Long>> {
private String[] fc;
private int start, end;
//构造函数
MR(String[] fc, int fr, int to) {
this.fc = fc;
this.start = fr;
this.end = to;
}
@Override
protected Map<String, Long> compute() {
if (end - start == 1) {
return calc(fc[start]);
} else {
int mid = (start + end) / 2;
MR mr1 = new MR(fc, start, mid);
mr1.fork();
MR mr2 = new MR(fc, mid, end);
//计算子任务,并返回合并的结果
return merge(mr2.compute(), mr1.join());
}
}
//合并结果
private Map<String, Long> merge(
Map<String, Long> r1,
Map<String, Long> r2) {
Map<String, Long> result = new HashMap<>();
result.putAll(r1);
//合并结果
r2.forEach((k, v) -> {
Long c = result.get(k);
if (c != null)
result.put(k, c + v);
else
result.put(k, v);
});
return result;
}
//统计单词数量
private Map<String, Long> calc(String line) {
Map<String, Long> result = new HashMap<>();
//分割单词
String[] words = line.split("\\s+");
//统计单词数量
for (String w : words) {
Long v = result.get(w);
if (v != null)
result.put(w, v + 1);
else
result.put(w, 1L);
}
return result;
}
}
总结
Fork/Join并行计算框架主要解决的是分治任务,分治的核心思想是“分而治之”,Fork/Join看作单机版的MapReduce。
Fork/Join并行计算框架的核心组件是ForkJoinPool。ForkJoinPool支持任务窃取机制。建议用不同的ForkJoinPool执行不同类型的计算任务。