在Java中为我们提供了观察者模式的相关接口和抽象Observable
和Observer
分别对应了观察者模式的观察者和消费者。但是这两个类在Java9的时候打上了@Deprecated
表明这两个接口已经过时了,取而代之的是叫我们使用Flow中的内部类。
- 通过查看代码,我们能发现,在
Observable
接口中,大量的使用了synchronized关键字来保证并发的安全性,同时存放订阅者的接口也是Java中的废弃API vector类。 - 在介绍观察者模式的时候我们谈到过观察者模式的缺点,由于是单线程,所以可能会导致出现在通知的时候,一个观察者被阻塞了(可能在执行耗时的操作),而导致后续的任务都被阻塞。而再
Observable
中为了保证同步安全,大量的使用了synchronized关键字进行加锁,这就导致了在并发下,性能会进一步消耗。
为了解决这个问题,在JDK9中引入了Flow框架来实现发布订阅模式,这也是JDK9实现响应式编程的一个标志。
首先来对比以下观察者模式和发布订阅模式的区别(有种说法是发布订阅模式就是观察者模式,但是实际上并不一样,或者说发布订阅模式是观察者模式的升级版):
- 在观察者模式中,我们有两个重要的角色:观察者和被观察者,其中被观察者在执行了某些操作之后会通知观察者,观察者接收到了通知后会进行响应的操作。
- 而在发布订阅模式中,我们有三个重要角色:发布者,订阅者和代理。其中发布者对应了观察者模式中的被观察者,而订阅者对应了观察者模式中的观察者。和观察者模式不同的是被观察者并不知道观察者,观察者也不知道被观察者。他们知道的是:代理(Broker)。消息队列就是典型的发布订阅模式,所以很多概念能直接类比。
在发布订阅模式中,发布者和订阅者是松耦合的,并且通过代理(Broker)来进行交流,而在观察者模式中被观察者和观察者是耦合在一起的。并且发布订阅模式通常是异步的。
JDK9中引入了Flow类来方便我们实现发布订阅模式,其中有四个内部类:
Subscriber Interface(订阅者订阅接口):
1 2 3 4 5 6
| public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); }
|
Subscription Interface (订阅令牌接口):
- 订阅令牌对象通过Subscriber.onSubscribe()方法传递
1 2 3 4
| public static interface Subscription { public void request(long n); public void cancel(); }
|
**Publisher Interface(发布者接口)**:
- 默认情况下,我们一般不使用publisher接口,而是使用他的实现类
SubmissionPublisher
,在实现类里已经为我们提供了相应的实现,并且是异步的。
1 2 3 4 5 6
| @FunctionalInterface public static interface Publisher<T> { public void subscribe(Subscriber<? super T> subscriber);
}
|
Processor Interface(处理器接口):
- 处理者Processor 可以同时充当订阅者和发布者,起到转换发布者——订阅者管道中的元素的作用。用于将发布者T类型的数据元素,接收并转换为类型R的数据并发布。
1 2
| public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { }
|
相关代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| package cn.gloduck.spider;
import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.TimeUnit;
public class PublisherDemo { public static void main(String[] args) throws InterruptedException { SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); publisher.subscribe(new MySubscriber()); for (int i = 0; i < 10; i++) { publisher.submit("消息内容为:" + i); } TimeUnit.SECONDS.sleep(1); }
private static class MySubscriber implements Flow.Subscriber<String> { private Flow.Subscription subscription;
@Override public void onSubscribe(Flow.Subscription subscription) { System.out.println("当前订阅者添加了订阅"); subscription.request(2); this.subscription = subscription; }
@Override public void onNext(String item) { System.out.printf("接收到了消息为:%s\n", item); }
@Override public void onError(Throwable throwable) { throwable.printStackTrace();
this.subscription.cancel(); }
@Override public void onComplete() { System.out.println("处理完了!"); } } }
|