Java Code

Java中的观察者模式和观察订阅模式

Posted on 2021-04-01,6 min read

在Java中为我们提供了观察者模式的相关接口和抽象ObservableObserver分别对应了观察者模式的观察者和消费者。但是这两个类在Java9的时候打上了@Deprecated表明这两个接口已经过时了,取而代之的是叫我们使用Flow中的内部类。

  • 通过查看代码,我们能发现,在Observable接口中,大量的使用了synchronized关键字来保证并发的安全性,同时存放订阅者的接口也是Java中的废弃API vector类。
  • 在介绍观察者模式的时候我们谈到过观察者模式的缺点,由于是单线程,所以可能会导致出现在通知的时候,一个观察者被阻塞了(可能在执行耗时的操作),而导致后续的任务都被阻塞。而再Observable中为了保证同步安全,大量的使用了synchronized关键字进行加锁,这就导致了在并发下,性能会进一步消耗。

为了解决这个问题,在JDK9中引入了Flow框架来实现发布订阅模式,这也是JDK9实现响应式编程的一个标志。

首先来对比以下观察者模式和发布订阅模式的区别(有种说法是发布订阅模式就是观察者模式,但是实际上并不一样,或者说发布订阅模式是观察者模式的升级版):

  • 在观察者模式中,我们有两个重要的角色:观察者和被观察者,其中被观察者在执行了某些操作之后会通知观察者,观察者接收到了通知后会进行响应的操作。
  • 而在发布订阅模式中,我们有三个重要角色:发布者,订阅者和代理。其中发布者对应了观察者模式中的被观察者,而订阅者对应了观察者模式中的观察者。和观察者模式不同的是被观察者并不知道观察者,观察者也不知道被观察者。他们知道的是:代理(Broker)。消息队列就是典型的发布订阅模式,所以很多概念能直接类比。

在发布订阅模式中,发布者和订阅者是松耦合的,并且通过代理(Broker)来进行交流,而在观察者模式中被观察者和观察者是耦合在一起的。并且发布订阅模式通常是异步的。

JDK9中引入了Flow类来方便我们实现发布订阅模式,其中有四个内部类:

Subscriber Interface(订阅者订阅接口)

public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);	// 在发布者接受订阅者的订阅动作之后,发布任何的订阅消息之前被调用。新创建的Subscription订阅令牌对象通过此方法传递给订阅者。
    public void onNext(T item); // 下一个待处理的数据项的处理函数,相当于Observer的update,具体的处理逻辑就是在这里处理的。
    public void onError(Throwable throwable); // 在发布者或订阅遇到不可恢复的错误时调用
    public void onComplete(); // 当没有订阅者调用(包括onNext()方法)发生时调用。
}

Subscription Interface (订阅令牌接口)

  • 订阅令牌对象通过Subscriber.onSubscribe()方法传递
public static interface Subscription {
    public void request(long n); // 是无阻塞背压概念背后的关键方法。订阅者使用它来请求n个以上的消费项目。这样,订阅者控制了它当前能够接收多少个数据,超过了就不会处理了。可以将其设置成Long.MAX_VALUE来表示无限个。
    public void cancel(); // 由订阅者主动来取消其订阅,取消后将不会在接收到任何数据消息
}

Publisher Interface(发布者接口)

  • 默认情况下,我们一般不使用publisher接口,而是使用他的实现类SubmissionPublisher,在实现类里已经为我们提供了相应的实现,并且是异步的。
@FunctionalInterface
public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber); // 调用该方法,建立订阅者Subscriber与发布者Publisher之间的消息订阅关系。


}

Processor Interface(处理器接口)

  • 处理者Processor 可以同时充当订阅者和发布者,起到转换发布者——订阅者管道中的元素的作用。用于将发布者T类型的数据元素,接收并转换为类型R的数据并发布。
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

相关代码

package cn.gloduck.spider;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

public class PublisherDemo {
    public static void main(String[] args) throws InterruptedException {
        // 定义发布者
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
        // 添加一个订阅者
        publisher.subscribe(new MySubscriber());
        for (int i = 0; i < 10; i++) {
            // 发布10个消息
            publisher.submit("消息内容为:" + i);
        }
        // 暂停一秒钟,因为publisher的发送是异步的。
        TimeUnit.SECONDS.sleep(1);
    }

    /**
     * 订阅者
     */
    private static class MySubscriber implements Flow.Subscriber<String> {
        private Flow.Subscription subscription;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.println("当前订阅者添加了订阅");
            // 表明当前订阅者只处理两个消息。
            subscription.request(2);
            // 在类中保存一个令牌的备份
            this.subscription = subscription;
        }

        @Override
        public void onNext(String item) {
            // 接收并处理发布者发布的消息
            System.out.printf("接收到了消息为:%s\n", item);
        }

        @Override
        public void onError(Throwable throwable) {
            // 出现了异常(例如处理数据的时候产生了异常)
            throwable.printStackTrace();

            // 我们可以告诉发布者, 后面不接受数据了
            this.subscription.cancel();
        }

        @Override
        public void onComplete() {
            System.out.println("处理完了!");
        }
    }
}


下一篇: Graalvm反射配置辅助工具agentlib→

loading...