4.基础构建模块

tech2025-11-05  8

本章介绍一些有用的并发构建模块,以及这些模块在构建并发应用程序时的一些常用模式。

1.同步容器类

早期同步容器:Vector和Hashtable。他们实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法进行同步,使得每次只有一个线程能访问容器状态

 

(1)同步容器的问题

/** * Vector存在的问题 */ public class VectorQuestion { public static Object getLast(Vector list){ int lastIndex = list.size() - 1; return list.get(lastIndex); } public static Object deleteLast(Vector list){ int lastIndex = list.size() - 1; return list.remove(lastIndex); } }

表面上看,无论多少个线程同时调用,都不会破坏vector。但是调用结果对于调用者来说可能是不对的:调用的正好是被删掉的那个元素,会抛数组角标越界错。因此需要额外加锁。

/** * Vector存在的问题 */ public class VectorQuestion { public static Object getLast(Vector list){ synchronized (list) { int lastIndex = list.size() - 1; return list.get(lastIndex); } } public static Object deleteLast(Vector list){ synchronized (list) { int lastIndex = list.size() - 1; return list.remove(lastIndex); } } }

同样,迭代器也会出现这样的问题。

 

(2)迭代器与ConcurrentModificationException

我们对容器类进行迭代的标准方式是:Iterator。但是如果有其他线程并发修改容器,那么使用迭代器就需要在迭代期间对容器加锁。如果在迭代的时候呗修改了数据,会抛ConcurrentModificationException错

它实现方式是:将计数器的变化与容器关联起来,如果在迭代期间计数器被修改,那么hasNext或next就会抛错。但是呢,在没有同步的情况下,可能会看到计数器被修改了,但是迭代器还没意识到已经发生修改。

 

想要避免ConcurrentModificationException,就需要在迭代过程中持有容器的锁,这个和Vector的解决方案是一样的。但是有弊端,长时间持有锁会降低性能啊。

还有一种办法就是克隆容器,并在副本上进行迭代。因为副本封闭在线程里,其他线程修改不到,当然,在克隆过程中还需要对容器加锁。只不过克隆会明显增加性能开销。

 

(3)隐藏迭代器

上面说了,对所有共享容器进行迭代的地方都要加锁,但是有的操作看着和迭代器没关系,其实也隐含了迭代器。

比如常见的:

System.out.println("");

因为在生成调试消息的过程中,toString对容器进行迭代。当然真正问题还是改代码所在的类不是线程安全的。在打印出对象之前必须要获取类的锁,但是一般会忽略。

 

除了上述代码,还会有一些我们常见的操作会间接的执行迭代操作:

hashCode,equals,containsAll,removeAll,retainAll等。

 

2.并发容器

上述同步容器对所有容器状态的访问都是串行化实现线程安全性的,代价是严重降低并发性。

并发容器是针对多个线程并发访问设计的,包含:ConcurrentHashMap,CopyOnWriteArrayList,ConcurrentSkipListMap,ConcurrentSkipListSet

同时还有两种新的容器类型:Queue和BlockingQueue。

Queue用来临时保存一组等待处理的元素,包括ConcurrentLinkedQueue:这是传统的先进先出队列PriorityQueue:这是优先队列(并非并发)。Queue不会阻塞。它是通过linkedList来实现的,但是它能去掉List的随机访问需求,更高效并发。

BlockingQueue增加了可阻塞的插入和获取等操作。如果为空,获取阻塞。如果满了,插入阻塞。

 

(1)ConcurrentHashMap

同步容器锁是吧整个散列表全都锁住,只能一个线程用,效率很低。独占!

与HashMap一样,ConcurrentHashMap也是基于散列的Map,但是它使用了粒度更细的加锁机制:分段锁。这种机制中,任意数量的读取线程可以并发的访问Map,执行读取操作的线程和写入操作的线程可以并发的访问Map,并且一定数量的写入线程可以并发的修改Map

 

ConcurrentHashMap提供的迭代器不会报ConcurrentModificationException错误,因此不需要再迭代过程中对容器加锁。

ConcurrentHashMap的迭代器有弱一致性当创建迭代器时会遍历已有的元素,但不保证在迭代器在迭代器被修改后将修改操作反映给容器。同时size和isEmpty方法,返回结果不一定精确。事实上并发环境下,这两个参数也没那么重要。但是,对get,put,containsKey,remove等操作性能优化了。

 

(2)额外的原子Map操作

对于一些常见的复合操作,在ConcurrentMap中都声明了:若没有则添加,若相等则移除,若相等则替换

 

(3)CopyOnWriteArrayList

CopyOnWriteArrayList用于替换同步list,并且迭代时不需要加锁。

CopyOnWrite容器的安全性在于,只要发布一个事实不可变的对象,那么在访问该对象时就不需要再进一步同步在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。CopyOnWrite容器的迭代器保留了执行底层基础数组的引用。迭代时无需加锁。

显然每次修改容器都复制底层数组,这需要开销。所以常用语事件通知系统:查询多修改少。

 

3.阻塞队列和生产者——消费者模式

阻塞队列:如果队列满了,那么put方法将阻塞,直到有空间可用;如果队列空了,take方法阻塞,直到有元素可用

生产者消费者模式,配合阻塞队列,能够将生产和消费解耦,简化工作负载的管理。

阻塞队列同样提供了offer方法,如果数据项不能被添加到队列中,那么将返回一个失败状态。这样就能灵活的处理生产者线程数量。

 

对于BlockingQueue的实现有多个。LinkedBlockingQueue和ArrayBlockingQueue是先进先出队列,与LinkedList和ArrayList类似,但是拥有更好的并发性能。PriorityBlockingQueue是按优先级排序的队列,通过比较器可以实现排序。

SynchronousQueue实际上不是真正的队列,因为它不会为队列元素维护存储空间,他维护一组线程。这些线程在等着把元素加入或移出队列。传统的队列都是先入队再出队,而这种直接交付,当交付被接受时,他就知道消费者已经得到任务,中间省去了一步。

 

(1)串行线程封闭

concurrent中实现的各种阻塞队列都包含了足够的内部同步机制,从而安全的将对象从生产者线程发布到消费者线程

封闭线程对象只能由单个线程拥有,但可以通过安全的发布该对象来转移所有权。在转移所有权之后,也只有另一个线程能获得这个对象的访问权限,并且发布对象的线程不会再访问它

对象池利用串行线程封闭,将对象借给一个请求线程。只要对象池包含足够的内部同步来安全的发布池中对象,并且只要客户代码本身不会发布池中对象或者将对象返回对象池后就不再使用它,那么就可以安全的在线程之间传递所有权。

 

(2)双端队列和工作密取

java6新增2种容器类型:Deque和BlockDeque,他们分别对Queue和BlockQueue进行拓展。

Deque是双端队列,实现了在队列头和队列尾的高效插入和移除

消费者有各自的双端队列,如果一个消费者完成自己双端队列的全部工作,可以从其他消费者末尾秘密获取工作,减少竞争,提高效率。

 

4.阻塞方法与中断方法

阻塞状态有:BLOCKED、WAITING、TIMED_WAITING。

Thread提供了interrupt方法,用于中断线程。

 

当代码抛出一个InterruptException异常时,一般有两种选择:

(1)传递InterruptException,将它传递给方法的调用者

(2)捕获InterruptException,并通过调用当前线程上的interrupt方法恢复中断状态

 

5.同步工具类

同步工具类可以是任何对象,只要根据自身状态来协调线程的控制流。阻塞队列就是一种同步工具类。

其他的同步工具类还有:信号量(Semaphore),栅栏(Barrier),闭锁(Latch)

 

(1)闭锁

延迟线程的进度直到达到最终状态。

闭锁相当于一扇门:在闭锁到达结束状态之前,这扇门是关闭的,并且没有任何线程能通过,当到达结束状态时,允许所有线程通过。并且结束后,状态不会变,门一直打开。

常用于:只有都初始化好了才能执行的操作,所依赖的服务都好了它才能执行,玩家都准备就绪了才开始的游戏。。。

 

CountDownLatch是一种灵活的闭锁实现。它可以使一个或多个线程等待一组事件发生。

闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown递减计数器,表示事件正在发生。而await方法等待计数器到达0。

 

示例:N个线程并发执行某个任务需要的时间

public class TestHarness { public long timeTasks(int nThreads,final Runnable task) throws InterruptedException{ final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(nThreads); for (int i = 0;i<nThreads;i++){ Thread t = new Thread(){ public void run(){ try { startGate.await(); try { task.run(); }finally { endGate.countDown(); } } catch (InterruptedException e) { e.printStackTrace(); } } }; t.start(); } long start = System.nanoTime(); startGate.countDown(); endGate.await(); long end = System.nanoTime(); return end - start; } }

 

(2)FutureTask

FutureTask也可以用作闭锁。FutureTask表示的计算是通过Callable实现的,相当于可生成结果的Runnable,并且可以处于以下3种状态:等待运行,正在运行,运行完成。其中正常结束、由于取消而结束、由于异常而结束都算执行完成。当FutureTask进入完成状态后,它会永远停在这个状态上

Future.get的行为取决于任务的状态。如果已完成,返回结果。否则阻塞,直到任务进入完成状态返回结果。FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,并确保传递过程能实现结果的安全发布。

 

FutureTask在Executor框架中表示异步任务,此外还可以表示一些时间较长的计算,这些计算可以在使用计算结果之前启动

public class FutureTaskPreLoad { private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>( new Callable<ProductInfo>() { @Override public ProductInfo call() throws Exception { return loadProductInfo(); } } ); private final Thread thread = new Thread(future); public void start(){ thread.start(); } public ProductInfo get() throws Exception{ return future.get(); } }

 

(3)信号量

计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个制定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界

Semaphore管理着一组虚拟的许可(permit),许可的初始数量可通过构造函数来指定,在执行操作时可以首先获得许可,并在使用后释放许可。如果没有许可,那么acquire将阻塞直到获取到许可,release方法将释放一个许可信号量。

 

Semaphore可以实现资源池,也可以将任何一种容器变成有界阻塞容器。

/** * 使用Semaphore为容器设置边界 */ public class BoundHashSet { private final Set set; private final Semaphore sem; public BoundHashSet(int bound){ this.set = Collections.synchronizedSet(new HashSet<>()); sem = new Semaphore(bound); } public boolean add(int o) throws InterruptedException { sem.acquire(); boolean wasAdded = false; try { wasAdded = set.add(o); return wasAdded; }finally { if(!wasAdded){ sem.release(); } } } public boolean remove(Object o){ boolean wasRemoved = set.remove(o); if(wasRemoved){ sem.release(); } return wasRemoved; } }

 

(4)栅栏

闭锁前面说了:启动一组相关的操作,或者等待一组相关操作结束,它是一次性对象,一旦进入终止状态,不能被重置

栅栏类似于闭锁,能阻塞一组线程直到某个事件发生,所有线程必须同事到达栅栏位置,才能继续执行。用于实现一些协议。并且栅栏可以被重置以便下次使用

区别在于:闭锁用于等待事件,栅栏等待线程。闭锁一次性,栅栏可多次

 

CyclicBarrier可以使一定数量的参与方反复的在栅栏位置汇集。

通常将一个问题,拆成多个相互独立的子问题。当线程到达栅栏位置时,将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置,然后栅栏打开,所有线程释放。而栅栏则被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就认为被打破了,所有阻塞的await调用终止并抛出BrokenBarrierException。如果成功通过栅栏,await为每个线程返回一个唯一的到达索引号,我们利用索引号“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作

 

模拟程序时一般用到栅栏,比如n-body粒子模拟系统,细胞的自动化模拟。

 

另外一种形式的栅栏是Exchanger,它是一种两方栅栏,各方在栅栏位置上交换数据。当两方执行不对称的操作时,Exchanger会非常有用。比如一个线程向缓冲区写,一个线程读,通过Exchanger可以安全的发布给另一方。

 

6.构件搞笑且可伸缩的结果缓存

重用之前的计算结果可以提高效率,因此缓存可以将性能瓶颈转变成可伸缩瓶颈。

(1)使用HashMap和同步机制来初始化缓存

public interface Computable { BigInteger compute(String i) throws InterruptedException; } public class ExpensiveFunction implements Computable { @Override public BigInteger compute(String i) throws InterruptedException { //在经过长时间的计算后 return new BigInteger(i); } } public class Memoizer1 implements Computable { @GuardedBy("this") private final Map<String,BigInteger> cache = new HashMap<String,BigInteger>(); private final Computable c; public Memoizer1(Computable c){ this.c = c; } @Override public synchronized BigInteger compute(String i) throws InterruptedException { BigInteger result = cache.get(i); if(result == null){ result = c.compute(i); cache.put(i,result); } return result; } }

因为HashMap线程不安全,所以对整个compute方法进行同步,但是每次只有一个线程能够执行compute,这不好,可伸缩性太差。

 

(2)使用ConcurrentHashMap替换HashMap

public class Memoizer2 implements Computable { private final Map<String,BigInteger> cache = new ConcurrentHashMap<>(); private final Computable c; public Memoizer2(Computable c){ this.c = c; } @Override public BigInteger compute(String i) throws InterruptedException { BigInteger result = cache.get(i); if(result == null){ result = c.compute(i); cache.put(i,result); } return result; } }

这比1好在:多个线程可以并发使用。但是有一个不足:当两个线程同时调用,compute会导致计算得到相同的值。

为了解决这个问题,使用FutureTask,FutureTask表示一个计算的过程,这个过程可能已经完成也可能正在进行。如果完成得到结果,如果正在执行,阻塞。

 

(3)基于FutureTask的封装

public class Memoizer3 implements Computable { private final Map<String,Future<BigInteger>> cache = new ConcurrentHashMap<>(); private final Computable c; public Memoizer3(Computable c){ this.c = c; } @Override public BigInteger compute(final String i) throws Exception { Future<BigInteger> result = cache.get(i); if(result == null){ Callable eval = new Callable() { @Override public Object call() throws Exception { return c.compute(i); } }; FutureTask ft = new FutureTask(eval); result = ft; cache.put(i,ft); ft.run();//在这里将调用c.compute } try { return result.get(); } catch (ExecutionException e) { e.printStackTrace(); throw new Exception(); } } }

基于ConcurrentHashMap有很好的并发性,别的线程如果正在计算,那么新的线程不会自己去计算,只是等待计算结果。但是还有个缺陷:两个线程计算出相同值。因为if是非原子的“先检查再执行”,因此有可能两个仙童同一时间调用compute来计算相同的值。

 

(4)最终实现

public class Memoizer4 implements Computable { private final Map<String,Future<BigInteger>> cache = new ConcurrentHashMap<>(); private final Computable c; public Memoizer4(Computable c){ this.c = c; } @Override public BigInteger compute(final String i) throws Exception { Future<BigInteger> result = cache.get(i); if(result == null){ Callable eval = new Callable() { @Override public Object call() throws Exception { return c.compute(i); } }; FutureTask ft = new FutureTask(eval); result = cache.putIfAbsent(i,ft);//原子方法 cache.put(i,ft); ft.run();//在这里将调用c.compute } try { return result.get(); } catch (ExecutionException e) { e.printStackTrace(); throw new Exception(); } } }

因为这里缓存的是Future而不是值,存在缓存污染的情况:计算被取消或失败。因此一旦发现这种情况,清除缓存。同时,通过FutereTask子类来解决缓存逾期问题。

最新回复(0)