- 在看多线程和并发的时候看到书上使用并发的知识设计了一个缓存容器,觉得挺有用的,故写一篇博客记录一下。
首先,定义一个接口,接口中定义一个运算的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
interface Computable<A,V>{
V compute(A arg) throws InterruptedException; }
|
然后我们给出一个实现类,这个实现类实现最基本的功能
1 2 3 4 5 6 7 8 9
| class Function implements Computable<String,String>{
@Override public String compute(String arg) throws InterruptedException { arg = "参数为:" + arg + "的运算结果"; return arg; } }
|
然后是一个缓存类A(不知道有没有发现,这里使用了装饰者模式):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| class CacheClassA<A,V> implements Computable<A,V>{ private final Map<A,V> cache = new HashMap<>(16); private final Computable<A,V> computable; public CacheClassA(Computable<A,V> cacheable){ this.computable = cacheable; } @Override public synchronized V compute(A arg) throws InterruptedException { V result = cache.get(arg); if(result == null){ result = computable.compute(arg); cache.put(arg,result); } return result; } }
|
现在给出了这三个类,那么这三个类在多线程进行运算(即调用compute)的时候有没有线程不安全的问题?
由于我们使用了synchronized对方法进行了加锁,所以很明显这个没什么线程不安全的问题,它有什么问题呢?那就是并发行下降,由于使用了同步加锁,使得这段代码在多线程的时候无法承受高并发。但是,我们也需要线程安全。总所周知,HashMap并不是一个线程安全的Map集合,如果去掉synchronized,就会导致多线程不安全的问题。于是我们将HashMap替换成ConcurrentHashMap。其他代码不变。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| class CacheClassB<A, V> implements Computable<A, V> { private final Map<A, V> cache = new ConcurrentHashMap<>(16); private final Computable<A, V> computable; public CacheClassB(Computable<A, V> cacheable) { this.computable = cacheable; } @Override public V compute(A arg) throws InterruptedException { V result = cache.get(arg); if (result == null) { result = computable.compute(arg); cache.put(arg, result); } return result; } }
|
在此段代码中,只是将HashMap改变成了ConcurrentHashMap。那么这段代码安全么,在当前场景是安全的,但是如果是其他场景就会有问题。只不过在此处这个代码不安全的表现在进行了重复计算。
比如两个线程,A和B同时进入compute的代码,他们运行的是同一个值(即arg相同)。此时由A先进入到if判断result=null,开始运算。但是由于这个运算的时间特别长,导致B进入判断的时候还没有出结果,于是B也开始进行了这个运算。这就造成了重复计算。与我们的目的就偏离了。
我们释放掉CacheClassB和Function类的注释来模拟长时间运算的场景,测试一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public static void main(String[] args) { Computable<String,String> computer = new Function(); CacheClassB<String,String> cachedComputer = new CacheClassB<>(computer); Thread threadA = new Thread(()->{ try { cachedComputer.compute("1"); } catch (InterruptedException e) { e.printStackTrace(); } }); Thread threadB = new Thread(()->{ try { cachedComputer.compute("1"); } catch (InterruptedException e) { e.printStackTrace(); } }); threadA.start(); threadB.start(); }
|
运行结果为:
1 2 3 4 5
| 计算了:1的值 计算了:1的值
Process finished with exit code 0
|
我们发现运行了两次,也就验证了我们的猜测。在第二个线程中并没有从缓存中取值,而是又进行了一次运算。
那么我们希望通过某种方法表示:”线程X正在计算”这种情况,那么对另外一个线程计算同一个值,它能够直到最高效的方法是等待X计算结束。这里我们使用FutureTask来实现这个功能。也能使用其他同步工具类来实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| class CacheClassC<A, V> implements Computable<A, V> { private final Map<A, Future<V>> cache = new ConcurrentHashMap<>(16); private final Computable<A, V> computable;
public CacheClassC(Computable<A, V> cacheable) { this.computable = cacheable; }
@Override public V compute(A arg) throws InterruptedException {
Future<V> f = cache.get(arg); if (f == null) { System.out.println("计算了:" + arg + "的值"); Callable<V> eval = new Callable<V>() { @Override public V call() throws Exception { return computable.compute(arg); } }; FutureTask<V> ft = new FutureTask<>(eval); f = ft; cache.put(arg, ft); ft.run();
} V result = null; try { result = f.get(); } catch (ExecutionException e) { e.printStackTrace(); } return result; } }
|
注意对比和CacheClassB的区别。我们利用Future对应map中的参数,如果通过参数能取到Future那么证明这个参数是计算过的。可以直接返回。
如果不能取到的话,我们也能防止长时间的运算导致重复计算的情况(当然,这里也可能会重复计算,如果多个线程同时进入if条件中的话。但是由于相比耗时长的compute操作,这个概率会小很多)。(鉴于直接运行两个线程太快,所以这里不做测试了。只要直到这里重复计算的概率会小很多就行了。)
由于if代码块仍然是非原子的”先检查在执行”,所以两个线程仍然可能在同一时间调用compute来计算相同的值。
原因是复合操作”若没有则添加”是在底层的Map对象执行的,而这个对象无法通过加锁来保证原子性。所以我们使用putIfAbsent方法来解决这个漏洞
同时在run方法调用的时机多做一次判断,再次减小,A和B线程真就那么同时在几乎相同的时刻计算同一个值导致重复计算的概率
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| class CacheClassD<A, V> implements Computable<A, V> { private final Map<A, Future<V>> cache = new ConcurrentHashMap<>(16); private final Computable<A, V> computable;
public CacheClassD(Computable<A, V> cacheable) { this.computable = cacheable; }
@Override public V compute(A arg) throws InterruptedException {
Future<V> f = cache.get(arg); if (f == null) { Callable<V> eval = new Callable<V>() { @Override public V call() throws Exception { return computable.compute(arg); } }; FutureTask<V> ft = new FutureTask<>(eval); f = cache.putIfAbsent(arg,ft); if(f == null){ System.out.println("计算了:" + arg + "的值"); f = ft; ft.run(); } } V result = null; try { result = f.get(); } catch (ExecutionException e) { e.printStackTrace(); } return result; } }
|
注意,这里改变了一下compute中打印代码的位置,因为只有进入第二层if的时候才是真正的调用了compute方法的
1 2 3
| 计算了:1的值
Process finished with exit code 0
|
OK,我们看到这里只进行了一次计算。这就是对一个同步代码的高并发优化。
顺便一提:CacheClassD还是存在问题的,鉴于这里只讨论并发优化,所以还有的问题只提一下
当缓存的是Future而不是值的时候,将会导致缓存污染问题(Cache Pollution):如果某个计算被取消或失败,那么在计算这个结果时将指明计算过程被取消或者失败。为了避免这种情况,还要在CacheClassD的代码上添加如果发现计算被取消,或者检测到RuntimeException的时候移除Future的功能。这个代码才算完美。
CacheClassD没有解决缓存逾期的问题,它可以通过使用FutureTask的子类来解决,在子类为每个结果指定一个逾期时间,并且定期扫描逾期的元素。
CacheClassD没有解决缓存清理的问题,即移除旧的计算结果以便为新的计算结果腾出空间,从而不会消耗过多的内存。