设计模式 - 事件监听者模式 - JDK & Spring & Guava 各有千秋
2020/4/2 17:01:32
本文主要是介绍设计模式 - 事件监听者模式 - JDK & Spring & Guava 各有千秋,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
何为事件监听者模式 ?
第一就是为啥我强调事件二字 ,因为他是目标 . 在我们开发中绝对见到过一堆后缀是
Listener
的类, 这个就是监听者模式, 监听者模式是一种CS开发架构
,很好的做了一种设计的解耦,监听者注册到一个邮局中,订阅某种事件(提前说好了), 邮局会按需求发布消息, 监听者会及时收到消息来处理 . 其中整个Java开发环境中 , JDK已经帮我们定义好了接口 , Spring就是基于JDK接口下实现的, Guava则是另一种实现方式, 各有千秋 , 大家看看吧 , 到底是回调好还是阻塞, 还是Guava这种方式呢, 他和观察者模式有何区别呢 ? 我们有机会在讲
1. Java原生规范
1. EventObject
事件对象 , 他需要一个事件源 , 用构造函数传递的
public class EventObject implements java.io.Serializable { protected transient Object source; public EventObject(Object source) { if (source == null) throw new IllegalArgumentException("null source"); this.source = source; } ........... 其他省略 } 复制代码
2. EventListener
事件监听者,他是负责监听事件的 , JAVA提供的是一个空接口, 让我们根绝需求写
public interface EventListener { } 复制代码
3. 总结
我们发现 java 提供的只提供了一个事件对象 ,和一个事件监听器 ,所以需要我们遵守这个规范去开发
2. Java规范设计一个监听者模式 - 基于回调模式
1. 事件源 - EventSource
一般情况下 都会设置成一个 Object 类型的 , 不需要我们去设计一个,为了体现设计模式的角色,我们就设计了一个
@ToString @Setter @Getter public class EventSource { private String name; private String info; } 复制代码
2. 事件对象 - EventObject
这里我们继承了 EventObject , 只是简单的实现了一下 , 并没有做过多的包装
public class CoreEventObject extends EventObject { public CoreEventObject(EventSource source) { super(source); } } 复制代码
3. 事件监听者 - EventListener
这里我们真正的监听者 , 一般情况下都需要设计成一个 函数式接口 , 我这个是和Spring框架学习的 , 因为函数式接口才能体现回调 ,
@FunctionalInterface public interface CoreEventListener<E extends CoreEventObject> extends EventListener { void onEventObject(E event); } 复制代码
4. 事件发布者 - EventPublisher
事件发布者 ,因为没有发布的事件对象, 哪来的监听
public class EventPublisher<E extends CoreEventObject> { private CoreEventListener<E> listener; public EventPublisher(CoreEventListener<E> listener) { this.listener = listener; } public void publish(E object){ System.out.println("发布事件 : " + object); // 传给 CoreEventListener listener.onEventObject(object); } } 复制代码
5. 测试Demo
public class TestDemo { public static void main(String[] args) { // 1. 创建一个事件发布者 EventPublisher<CoreEventObject> publisher = new EventPublisher<>(new CoreEventListener<CoreEventObject>() { @Override public void onEventObject(CoreEventObject event) { System.out.println("接收到事件源 : " + event.getSource() + " , 当前线程 : " + Thread.currentThread().getName()); } }); // 2. 发布一个事件对象 publisher.publish(getCoreEventObject()); } private static CoreEventObject getCoreEventObject(){ ..... 此处省略 return eventObject; } } 复制代码
输出结果 :
发布事件 : com.example.listener_design_pattern.CoreEventObject[source=EventSource(name=事件源, info=Sat Nov 09 14:34:50 CST 2019)] 接收到事件源 : EventSource(name=事件源, info=Sat Nov 09 14:34:50 CST 2019) , 当前线程 : main 复制代码
我们发现我们成功的接收到了事件对象 和 事件源 , 这个就是钩子函数的魅力 . 其实你只是做了一个事件发布你无心观察其他的东西 , 只需要一个监听者就可以做到监听了 , 这样你的事件发布 和 监听 完全就解耦了 .其实底层就是一个地址引用 .
3. 回调函数存在的问题 ?
1. 我们的问题 ?
很多场景下,我们的发布事件和监听事件完全在两个线程中,那么我们如何拿到事件对象呢 ?
如果我们简单使用一下 , 会这么写 ?
public class TestEventListener implements CoreEventListener<CoreEventObject> { private CoreEventObject object; @Override public void onEventObject(CoreEventObject object) { // 赋值给成员变量 this.object = object; } // 获取成员变量 public CoreEventObject getObject() { return object; } } 复制代码
测试一下 :
public class TestDemo { public static void main(String[] args) { TestEventListener listener = new TestEventListener(); CoreEventObject object = listener.getObject(); // 先去拿 ,后去发布 System.out.println(object.getSource()); EventPublisher<CoreEventObject> publisher = new EventPublisher<>(listener); publisher.publish(getCoreEventObject()); } private static CoreEventObject getCoreEventObject(){ .... return eventObject; } } 复制代码
输出结果
Exception in thread "main" java.lang.NullPointerException at com.example.listener_design_pattern.TestDemo.main(TestDemo.java:28) 复制代码
有些人就会说 , 你这不对哇 ,你当然拿不到了 ,因为人家还没发布了 ,但是在多线程 ,在解耦的情况下 ,你哪知道对面何时发布结束了 , 你再去拿呢 ? 那就需要java的多线程知识了 ,Future 给我们带来了提醒 , 就是阻塞的思想 , 只有监听者真正的收到对象 , 我们才能去拿 .
2. 解决问题
了解过我前面提到的那一节 FutureTask
是如何实现的 ,我觉得问题就迎刃而解了 .
public class TestEventListener implements CoreEventListener<CoreEventObject> { private CoreEventObject object; /** * 当 X = 0 ,代表 obj还没有初始化了 * 当 x = 1 , 代表 obj 以及初始化了 , 已经接收到了 */ private static volatile int x = 0; @Override public void onEventObject(CoreEventObject object) { this.object = object; // 收到改成 1 x = 1; } public CoreEventObject getObject() { while (true) { if (x == 1) { break; } } // 拿到对象,再设置为1 x = 0; return object; } } 复制代码
由于这个解决方案,会使得执行getObject()
的线程一直的阻塞下去,就是死循环下去,我们必须一个线程去执行这个方法 ,
public class TestDemo { public static void main(String[] args) { TestEventListener listener = new TestEventListener(); // 新建一个线程去接收 Thread thread = new Thread(() -> { System.out.println("我开始接收对象 : " + System.currentTimeMillis()); CoreEventObject object = listener.getObject(); System.out.println("成功接收对象 : "+object.getSource()); }); thread.start(); // 新建一个线程去发布 EventPublisher<CoreEventObject> publisher = new EventPublisher<>(listener); new Thread(()->{ publisher.publish(getCoreEventObject()); }).start(); } private static CoreEventObject getCoreEventObject(){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } EventSource source = new EventSource(); source.setName("事件源"); source.setInfo("" + System.currentTimeMillis()); return new CoreEventObject(source); } } 复制代码
输出结果 :
我开始接收对象 : 1573282924555 发布事件 : com.example.listener_design_pattern.CoreEventObject[source=EventSource(name=事件源, info=1573282925590)] 成功接收对象 : EventSource(name=事件源, info=1573282925590) 复制代码
我们我们接收的时候是在1573282924555的时间搓 , 而真正拿到的对象确实在1573282925590发布的 , 这个就完全在俩时间轴上,所以我们成功的解决了问题 .
4. Spring 中的 ApplicationListener
1. ApplicationEvent
Class to be extended by all application events. Abstract as it doesn't make sense for generic events to be published directly.
此类被所有的
application events
所继承 。抽象的原因是因为直接发布这个ApplicationEvent
是没有意义的。
2. ApplicationListener
Interface to be implemented by application event listeners. Based on the standard java.util.EventListener interface for the Observer design pattern.
这个接口被所有的
application event listeners.
所实现 , 基于Java的java.util.EventListener
接口规范
3. 开始使用
我们有一个需求就是 ,我们有一个服务会从远程不断的去拉去配置信息 ,一旦有改变就会发布配置信息 .
1. Config - 事件源
@ToString @Setter @Getter public class Config { private String namespace; private Map<String, Object> info; } 复制代码
2. ConfigEvent - 事件对象
// 这个注解,我们是根据Spring源码看到的 , 所以一致性,我就加了 @SuppressWarnings("serial") public class ConfigEvent extends ApplicationEvent { public ConfigEvent(Config source) { super(source); } } 复制代码
3. ConfigEventListener - 事件监听者
@Component public class ConfigEventListener implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean { @Override public void onApplicationEvent(ConfigEvent event) { System.out.println("接收到更新信息 : " + event.getSource()+" , 当前线程 : "+Thread.currentThread().getName()); } // 保证执行顺序 , 多个 ConfigEventListener就需要实现这个接口 @Override public int getOrder() { return Ordered.HIGHEST_PRECEDENCE; } // 初始化以后要做什么 ? @Override public void afterPropertiesSet() throws Exception { System.out.println("初始化当前ConfigEventListener"); } } 复制代码
5 . ConfigServer - 配置中心服务
@Service public class ConfigServer { // 注入applicationContext,因为只有他才可以执行发布事件 @Autowired private ApplicationContext applicationContext; // 这个是开启异步 ,后面会说到 // @Async public void publishConfig(){ // 需要发布 --- > 改变的事件 System.out.println("发布事件成功 , 当前线程 : "+Thread.currentThread().getName()); applicationContext.publishEvent(getChange()); } public ConfigEvent getChange(){ Config config = new Config(); config.setNamespace("application"); HashMap<String, Object> conf = new HashMap<>(); conf.put("server.port", 8088); config.setInfo(conf); return new ConfigEvent(config); } } 复制代码
6. 启动测试
@SpringBootApplication public class SpringListenerApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(SpringListenerApplication.class, args); } @Autowired private ConfigServer server; @Override public void run(String... args) throws Exception { server.publishConfig(); } } 复制代码
输出结果 :
..... 初始化当前ConfigEventListener .... 发布事件成功 , 当前线程 : main 接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : main 复制代码
所以一个Spring-Boot
的事件监听还是很简单的 ,类比到 Spring
一个道理,相信懂得人都知道 . 但是又一个问题是我们的 发布和监听都是 main线程 ,不好吧 ,玩意有很多事件了 ?
4. 开启异步发布
需要两个注解 @EnableAsync
启动Async功能 , 和 @Async
某个方法使用异步执行
发布事件成功 , 当前线程 : SimpleAsyncTaskExecutor-1 接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : SimpleAsyncTaskExecutor-1 复制代码
我们发现就出现了线程池执行 , 这个理的线程池 ,是可以进行配置的 , 只需要我们显式的注入下面这个SimpleAsyncTaskExecutor
Bean 就可以了
@Bean public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() { SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor(); // 需要传入一个 ThreadFactory实现类 , 所以看过我前面写的文章应该会写这个,比如 JUC- Executor那节 executor.setThreadFactory(new MyThreadFactory("anthony")); return executor; } 复制代码
输出结果 :
发布事件成功 , 当前线程 : anthony-1 接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : anthony-1 复制代码
5 . 多个监听器顺序执行
你可以跟我一样选择实现 ApplicationListener
和Ordered
,或者你可以直接实现 SmartApplicationListener
都一样的哈,没有哪个好哪个不好
监听器 一 :
@Component public class ConfigEventListenerStart implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean { @Override public void onApplicationEvent(ConfigEvent event) { System.out.println("ConfigEventListenerStart 接收到更新信息 : " + event.getSource()+" , 当前线程 : "+Thread.currentThread().getName()); } @Override public int getOrder() { return Ordered.HIGHEST_PRECEDENCE; } @Override public void afterPropertiesSet() throws Exception { System.out.println("初始化当前监听器 : " + this.toString()); } } 复制代码
监听器 二 :
@Component public class ConfigEventListenerEnd implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean { @Override public void onApplicationEvent(ConfigEvent event) { System.out.println("ConfigEventListenerEnd 接收到更新信息 : " + event.getSource()+" , 当前线程 : "+Thread.currentThread().getName()); } @Override public int getOrder() { return Ordered.HIGHEST_PRECEDENCE-1; } @Override public void afterPropertiesSet() throws Exception { System.out.println("初始化当前监听器 : " + this.toString()); } } 复制代码
输出结果 :
初始化当前监听器 : com.example.springlistener.listener.ConfigEventListenerEnd@6b54655f 初始化当前监听器 : com.example.springlistener.listener.ConfigEventListenerStart@665e9289 ..... 发布事件成功 , 当前线程 : anthony-1 ConfigEventListenerStart 接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : anthony-1 ConfigEventListenerEnd 接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : anthony-1 复制代码
5. Guava 中的 EventBus
Guava的EventBus 就是一个很好的事件注册发布的管理工具 , 他属于一种推送的模式 , 跟spring的很相似,
1. 依赖
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>28.0-jre</version> </dependency> 复制代码
2. 快速开始
主要的对象就是 ,
EventBus
事件总线, 他是管理所有监听者(或者叫做订阅者) ,通过EventBus#register
或者EventBus#unRegister
来管理的 , 同时监听者要有监听的事件 , 这里是基于方法级别的 (注意方法只能有一个参数,就是监听的事件), 需要在方法上加上@Subscribe
注解来表示监听 ,EventBus
可以通过EventBus#post
方法来发布事件 , 对应类型的监听者就会收到 . 同时EventBus
可以处理异常
public class QuicklyStart { public static void main(String[] args) { // 创建一个 事件总线 EventBus bus = new EventBus(new SubscriberExceptionHandler() { @Override public void handleException(Throwable exception, SubscriberExceptionContext context) { // 处理订阅者异常信息 System.out.println("异常信息 : "+exception.getMessage() + ", 异常事件 : " + context.getEvent()); } }); // 注册你的监听器 , 其实更加准确来说是订阅者 , 他属于一种发布订阅模式 bus.register(new EventListener()); // 事件总线发布事件 bus.post("sb"); // 事件总线发布事件 bus.post(new Event("hello Guava")); } } /** * 事件源 */ class Event { String msg; public Event(String msg) { this.msg = msg; } @Override public String toString() { return "Event{" + "msg='" + msg + '\'' + '}'; } } /** * 监听器 */ class EventListener { /** * {@link Subscribe} 一个这个代表一个订阅者,EventBus会将符合的事件发布到对应的订阅者上 , 但是不支持java的基本数据类型, int 之类的 * * @param event */ @Subscribe public void onEvent(Event event) { System.out.println("当前线程 : " + Thread.currentThread().getName() + ", 接收到事件 : " + event); } @Subscribe public void onStringEvent(String event) { error(); // 模拟异常 System.out.println("当前线程 : " + Thread.currentThread().getName() + ", 接收到事件 : " + event); } private void error() { int i = 1 / 0; } } 复制代码
输出 :
异常信息 : / by zero, 异常事件 : sb 当前线程 : main, 接收到事件 : Event{msg='hello Guava'} 复制代码
3. 基本原理
其实很简单 , 第一注册的时候 :
/** Registers all subscriber methods on the given listener object. */ void register(Object listener) { // 其实就是注解扫描 , 然后把元信息都给整出来 Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<?> eventType = entry.getKey(); Collection<Subscriber> eventMethodsInListener = entry.getValue(); // 查找有没有 CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); // 没有创建一个对象 if (eventSubscribers == null) { CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } // 添加进去 ,其实是 Subscriber对象 , 把method信息, 对象信息全部封装进去了 eventSubscribers.addAll(eventMethodsInListener); } } 复制代码
第二就是 发布
public void post(Object event) { // 根据事件获取对应的 Subscriber Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { // 发布出去 dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } } @Override void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); checkNotNull(subscribers); // 获取当前线程的队列 ,用ThreadLocal维护的线程安全, 其实是为了安全 Queue<Event> queueForThread = queue.get(); // 创建一个事件对象 queueForThread.offer(new Event(event, subscribers)); if (!dispatching.get()) { dispatching.set(true); try { Event nextEvent; while ((nextEvent = queueForThread.poll()) != null) { while (nextEvent.subscribers.hasNext()) { // 处理 nextEvent.subscribers.next().dispatchEvent(nextEvent.event); } } } finally { dispatching.remove(); queue.remove(); } } } /** Dispatches {@code event} to this subscriber using the proper executor. */ final void dispatchEvent(final Object event) { executor.execute( new Runnable() { @Override public void run() { try { // 关键点 ,就是这 invokeSubscriberMethod(event); } catch (InvocationTargetException e) { // 出现异常直接就 ... 调用异常回调方法了 bus.handleSubscriberException(e.getCause(), context(event)); } } }); } @VisibleForTesting void invokeSubscriberMethod(Object event) throws InvocationTargetException { try { // 原来如此 , .........method.invok 真.... 所以可以抓取异常 method.invoke(target, checkNotNull(event)); } catch (IllegalArgumentException e) { throw new Error("Method rejected target/argument: " + event, e); } catch (IllegalAccessException e) { throw new Error("Method became inaccessible: " + event, e); } catch (InvocationTargetException e) { if (e.getCause() instanceof Error) { throw (Error) e.getCause(); } throw e; } } 复制代码
所以这个玩意很简单 , 原理一看就分析出来了
6. 总结
我们发现我们的自己实现的监听者和 Spring
和Guava
这俩种实现有啥区别 , 无非就是我们自己实现的监听者模式, 对于 listener 的管理,没有做 , 我们只是一个 Publisher 一个 Listener,一对一的关系 , 这样子就很不好, 100个监听者就需要100个发布者 , 不符合设计模式的原则 , 所以参考Guava
,我们发现他无非做的就是一个对于Listener 的管理 , 但是有一个细节希望大家知道, 对于监听者模式 , 万一事件发布失败了 , 我们如何知道, 所以Guava
至少帮我们做了 , 他不是基于回调机制的, 而是使用了Java
的 Method#invoke
,看需求而定吧 , 只不过回调更加轻量级,
这篇关于设计模式 - 事件监听者模式 - JDK & Spring & Guava 各有千秋的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-25外企也半夜发布上线吗?
- 2024-05-24鸿蒙原生应用再新丁!芒果TV 入局鸿蒙
- 2024-05-22基本概念
- 2024-05-22检索数据
- 2024-05-22排序数据
- 2024-05-22基础过滤数据
- 2024-05-22通过逻辑操作符过滤数据
- 2024-05-22通过通配符过滤数据
- 2024-05-22字段的拼接与计算
- 2024-05-22聚合函数