浅谈业务解耦小工具 - Spring Event

以云看科技2024-04-26 14:16:46  126

一. 是什么

官网定义

"An event is an object that is created when something notable happens within an application, such as a user clicking a button or an HTTP request arriving at a web server. An event listener is an object that listens for events, and the code that handles the event is the event handler. Event handlers are invoked by an event publisher. The act of invoking an event is called publishing an event."

该定义强调了事件、事件监听器和事件处理器的概念,并将事件作为应用程序中有意义的操作所创建的对象。同时,事件监听器和事件处理器的作用也被明确描述为监听事件并做出响应。此外,官方定义还提到了事件发布者和发布事件的概念。

Spring Event 是 Spring 框架中的一个事件驱动模型,可以用来在不同的组件之间传递消息和响应事件。

优点:

降低组件之间的耦合度:通过使用 Spring Event,组件之间不需要显式地相互引用,而是通过事件进行交互,从而降低了它们之间的耦合度。

提高应用程序的可扩展性:由于组件之间的耦合度降低,因此在应用程序需要进行扩展或修改时,更容易添加或删除组件。

提高代码的重用性:由于事件监听器是一个单独的组件,因此它可以被多个事件源重复利用,从而提高代码的重用性。

提供了异步事件处理的支持:Spring Event 可以提供异步事件处理的支持,从而避免事件处理的阻塞,提高应用程序的性能。

方便进行事件的测试和模拟:由于事件机制是松散耦合的,因此很容易对事件进行测试和模拟,从而提高了代码的可测试性。

总的来说,Spring Event 提供了一种松散耦合的机制,通过事件的方式进行组件之间的交互,使得应用程序更加灵活、可扩展和可测试。

??缺点:

过度使用事件机制可能导致代码复杂度增加:在一个应用程序中过度使用事件机制可能导致代码结构变得复杂,这可能会使程序的维护和扩展变得困难。

可能会影响应用程序的性能:在事件处理中,当事件的数量较多时,事件监听器的执行可能会占用较多的 CPU 资源,从而影响应用程序的性能。

可能会导致事件的乱序执行:由于 Spring Event 是基于观察者模式实现的,因此事件监听器之间的执行顺序并不是确定的,这可能会导致事件的乱序执行。

可能会对调试造成一定的困难:由于事件机制中事件的触发和处理过程通常是异步的,因此在调试应用程序时可能会遇到一些困难。

因此,开发人员在使用 Spring Event 时应该根据具体的业务需求和场景来判断是否需要使用事件机制,并合理利用该机制来优化应用程序的架构。

二. 怎么用

定义事件

继承ApplicationEvent,将需要传递的事件信息包装为业务事件类,如下所示。

java复制代码public class FinishLvEvent extends ApplicationEvent { /** * 闯关结果 */ private String result; /** * 事件处理后返回内容容器 */ private Object response; //getter setter... public FinishLvEvent(Object source, String result, Object response) { super(source); this.result = result; this.response = response; }}

事件发布

注入ApplicationEventPublisher,通过publishEvent方法即可发布事件。ApplicationEventPublisher的一般实现是AbstractApplicationContext,即Spring容器本身就是可以发送Spring事件。

java复制代码@Servicepublic class AuditService { @Autowired private ApplicationEventPublisher applicationEventPublisher; public void audit(int auditStatus, long auditId) { // 修改数据库审核状态 auditMapper.updateStatusById(auditStatus, auditId); // 发送审核事件 applicationEventPublisher.publishEvent(new AuditEvent(this, auditStatus, auditId)); }}

事件监听

实现对应事件的监听器,需要继承ApplicationListener,并实现onApplicationEvent方法。通过泛型指定需要监听的事件类。

java复制代码@Componentpublic class AuditEventListener implements ApplicationListener { @Override public void onApplicationEvent(AuditEvent auditEvent) { // 事件处理 }}

也可以使用@EventListener注解方法,将其包装为事件处理器。它适用于:

1. 不想为每个事件处理都创建一个ApplicationListener实现类;

2. 希望支持更复杂的事件条件过滤。

@EventListener的classes属性可以过滤事件类型,而condition属性可以根据事件对象是否满足条件表达式来过滤事件。

java复制代码@Componentpublic class EventProcessor { @EventListener(classes={CustomEvent.class}, condition="#event.status==1") public void process(CustomEvent event) { // 事件处理 }}

异步处理

默认情况下,ApplicationListener处理事件是同步执行的。如果要处理耗时操作,希望异步执行,有三种方案:

1. 自定义ApplicationEventMulticaster

后面可以看到,Spring容器会优先使用beanName为applicationEventMulticater 的bean作为事件转发处理器,如果不存在则默认使用SimpleApplicationEventMulticaster作为事件转发处理器,它默认是同步执行的。但它支持设置Executor,那么我们可以将自定义的线程池处理器作为Executor,以此来支持异步执行。

java复制代码@Configurationpublic class Config { @Bean public SimpleApplicationEventMulticaster applicationEventMulticater { SimpleApplicationEventMulticaster multicaster = new SimpleApplicationEventMulticaster; multicaster.setTaskExecutor(Executors.newFixedThreadPool(1)); // 自定义线程池 return multicaster; }}

不过这样设置之后,所有Listener都是异步执行。对于需要同时支持异步和同步执行的应用,考虑下面的方案。

2. 手动提交线程池异步执行

适用于简单的场景。但这样对代码的侵入性比较高,

java复制代码private Executor executor = Executors.newFixedThreadPool(1);public void onApplicationEvent(CustomEvent event) { executor.execute( -> process(event));}

3. 使用@Async注解

对于@EventListener注解的方法处理器,可以使用@Async注解,使得方法调用变为异步调用。(需要在启动类Application上使用@EnableAsync注解,引入对@Async的支持。)

三. 源码分析

核心类、方法

ApplicationEvent

抽象类:所有的 Spring Event 都是继承自该抽象类。它定义了一个 source 字段,表示事件源,以及相关的 getter 和 setter 方法。

ApplicationListener

接口:所有的 Spring Event 监听器都需要实现该接口,实现 onApplicationEvent 方法来处理事件。

ApplicationEventPublisher

接口:事件发布者,定义了 publishEvent 方法,用于发布事件。

ApplicationEventPublisherAware

接口:通过实现该接口,可以让 bean 感知到事件发布者,并将其保存在一个成员变量中。

ApplicationEventMulticaster

接口:事件广播器,用于将事件广播给所有的监听器。Spring 默认提供了 SimpleApplicationEventMulticaster 和 AsyncApplicationEventMulticaster 两种实现。

AbstractApplicationEventMulticaster

抽象类:实现了 ApplicationEventMulticaster 接口的大部分方法,提供了一些基础的实现逻辑。SimpleApplicationEventMulticaster 和 AsyncApplicationEventMulticaster 都继承自该类。

GenericApplicationListener

接口:它是 ApplicationListener 接口的子接口,定义了一个泛型类型 E,用于指定监听的事件类型。

SimpleApplicationEventMulticaster

类:实现了 ApplicationEventMulticaster 接口,用于将事件广播给所有的监听器。

AsyncApplicationEventMulticaster

类:实现了 ApplicationEventMulticaster 接口,用于异步地将事件广播给所有的监听器。

ApplicationEventPublisherAwareProcessor

类:实现了 BeanPostProcessor 接口,用于将事件发布者自动注入到实现了 ApplicationEventPublisherAware 接口的 bean 中。

这些类和接口是 Spring Event 的核心,它们提供了事件的定义、监听器的注册和处理、事件发布者的注入等重要功能。

预留事件

Spring Framework 中预留了多个事件,包括但不限于以下事件:

ContextRefreshedEvent:在 ApplicationContext 被初始化或刷新时,该事件被发布。这也可以在 ConfigurableApplicationContext 接口中使用 refresh 方法来发生。

ContextStartedEvent:在启动时,该事件被发布,即在调用 ConfigurableApplicationContext 接口中的 start 方法时。

ContextStoppedEvent:在停止时,该事件被发布,即在调用 ConfigurableApplicationContext 接口中的 stop 方法时。

ContextClosedEvent:在 ApplicationContext 被关闭时,该事件被发布。这也可以在 ConfigurableApplicationContext 接口中使用 close 方法来发生。

RequestHandledEvent:在 Web 请求被处理后,该事件被发布。此事件仅适用于使用 Spring 的 Web 模块。

ServletRequestHandledEvent:在 Web 请求被处理后,该事件被发布。此事件仅适用于使用 Spring 的 Web 模块。

SessionDestroyedEvent:在 Web 会话被销毁时,该事件被发布。此事件仅适用于使用 Spring 的 Web 模块。

基于预留事件进行扩展Demo

完成后发布自定义事件。可以使用

@EventListener 注解或 ApplicationListener 接口来监听 ContextRefreshedEvent 事件,例如:

java复制代码@Componentpublic class MyContextRefreshedEventListener implements ApplicationListener { @Override public void onApplicationEvent(ContextRefreshedEvent event) { // 发布自定义事件 event.getApplicationContext.publishEvent(new MyContextRefreshedEvent(this)); }}

创建一个继承自 ApplicationListener 的自定义事件监听器,并实现 onApplicationEvent 方法来执行自定义的初始化操作。例如,可以创建一个 MyContextRefreshedEventListener 类:

java复制代码@Componentpublic class MyContextRefreshedEventListener implements ApplicationListener { @Override public void onApplicationEvent(MyContextRefreshedEvent event) { // 执行自定义的初始化操作 System.out.println("MyContextRefreshedEvent received, do some initialization..."); }}

通过以上步骤,当 Spring 容器启动完成后,会触发 ContextRefreshedEvent 事件,从而发布 MyContextRefreshedEvent 自定义事件,并通过自定义事件监听器 MyContextRefreshedEventListener 来执行自定义的初始化操作。

事件发布与转发

ApplicationEventPublisher

Spring容器(父类AbstractApplicationContext)把自身作为ApplicationEventPublisher类的实现注册到容器中

java复制代码// AbstractApplicationContext#prepareBeanFactorybeanFactory.registerResolvableDependency(ApplicationEventPublisher.class, this);

最终将类型、实例注册到:

java复制代码private final Map, Object> resolvableDependencies = new ConcurrentHashMap<>(16);

AbstractApplicationContext作为ApplicationEventPublisher,实现了publishEvent方法,将其转发给内部实现:

包装非ApplicationEvent对象为PayloadApplicationEvent

在Listener初始化之前保存早期发布的事件

在初始化之后直接给交给applicationEventMulticaster发布事件

同时将事件发布给父容器(如果有,且是AbstractApplicationContext实现)

java复制代码// AbstractApplicationContext@Overridepublic void publishEvent(ApplicationEvent event) { publishEvent(event, null);}@Overridepublic void publishEvent(Object event) { publishEvent(event, null);}// 具体实现protected void publishEvent(Object event, @Nullable ResolvableType eventType) { // 包装ApplicationEvent ApplicationEvent applicationEvent; if (event instanceof ApplicationEvent) { applicationEvent = (ApplicationEvent) event; } else { applicationEvent = new PayloadApplicationEvent<>(this, event); if (eventType == null) { eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType; } } // 考虑到部分事件在Listener注册之前就发布了,因此先保存起来 if (this.earlyApplicationEvents != null) { this.earlyApplicationEvents.add(applicationEvent); } else { getApplicationEventMulticaster.multicastEvent(applicationEvent, eventType); } // 同时给父容器发布事件 if (this.parent != null) { if (this.parent instanceof AbstractApplicationContext) { ((AbstractApplicationContext) this.parent).publishEvent(event, eventType); } else { this.parent.publishEvent(event); } }}

也就是事件转发依赖ApplicationEventMulticaster对象。容器初始化过程中会先完成ApplicationEventMulticaster对象的初始化,再注册Listener。

text复制代码// AbstractApplicationContextpublic void refresh { prepareRefresh; ... initApplicationEventMulticaster; onRefresh; registerListeners; // 注册监听器 ...}

如果容器中已经注册有名为applicationEventMulticaster的Bean,则使用它(我们可以通过这种方式注册自定义的ApplicationEventMulticaster,比如添加线程处理器),否则默认使用SimpleApplicationEventMulticaster作为转发器。

text复制代码public static final String APPLICATION_EVENT_MULTICASTER_BEAN_NAME = "applicationEventMulticaster";protected void initApplicationEventMulticaster { ConfigurableListableBeanFactory beanFactory = getBeanFactory; if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) { this.applicationEventMulticaster = beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class); } else { this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory); beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster); }}

将容器注册的Listener和Listener Bean,注册到applicationEventMulticaster。同时开始转发早期收集的事件。

text复制代码protected void registerListeners { for (ApplicationListener listener : getApplicationListeners) { getApplicationEventMulticaster.addApplicationListener(listener); } // 从容器中获取实现ApplicationListener的Bean String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false); for (String listenerBeanName : listenerBeanNames) { getApplicationEventMulticaster.addApplicationListenerBean(listenerBeanName); } // 初始化之前收集的事件,这时候开始转发 Set earlyEventsToProcess = this.earlyApplicationEvents; this.earlyApplicationEvents = null; if (earlyEventsToProcess != null) { for (ApplicationEvent earlyEvent : earlyEventsToProcess) { getApplicationEventMulticaster.multicastEvent(earlyEvent); } }}

关于earlyApplicationEventsAbstractApplicationContext的refresh方法确定了容器初始化的流程,在prepareRefresh中将earlyApplicationEvents初始化为空列表,再此之后初始化过程中发送的事件,都将保存在earlyApplicationEvents中。而后续先初始化applicationEventMulticaster,再注册完监听器,此时会将earlyApplicationEvents中所有事件发送给监听器,然后置为null,至此earlyApplicationEvents的任务就已经完成了。

SimpleApplicationEventMulticaster

SimpleApplicationEventMulticaster 继承自AbstractApplicationEventMulticaster ,后者实现了Listener和Listener Bean注册和移除、查找,前者在此基础上,实现了事件转发的功能。在multicastEvent方法中,遍历所有支持该事件类型的ApplicationListener,调用其onApplicationEvent方法。SimpleApplicationEventMulticaster 允许用户设置Executor,使用Executor执行事件处理程序。

java复制代码// SimpleApplicationEventMulticaster@Overridepublic void multicastEvent(ApplicationEvent event) { multicastEvent(event, resolveDefaultEventType(event));}@Overridepublic void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) { ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event)); Executor executor = getTaskExecutor; for (ApplicationListener listener : getApplicationListeners(event, type)) { if (executor != null) { executor.execute( -> invokeListener(listener, event)); } else { // 执行 listener.onApplicationEvent(event); invokeListener(listener, event); } }}

AbstractApplicationEventMulticasterdefaultRetrieverListenerRetriever 类型)负责维护两个列表applicationListeners 和 applicationListenerBeans。Listener和Listener Bean的注册和移除都是对这两个列表的新增和删除。(为什么需要Listener Bean?支持1. 非singleton bean;2. FactoryBean)

java复制代码@Overridepublic void addApplicationListener(ApplicationListener listener) { synchronized (this.retrievalMutex) { Object singletonTarget = AopProxyUtils.getSingletonTarget(listener); if (singletonTarget instanceof ApplicationListener) { this.defaultRetriever.applicationListeners.remove(singletonTarget); } this.defaultRetriever.applicationListeners.add(listener); this.retrieverCache.clear; }}@Overridepublic void removeApplicationListener(ApplicationListener listener) { synchronized (this.retrievalMutex) { this.defaultRetriever.applicationListeners.remove(listener); this.retrieverCache.clear; }}

每次事件转发需要查询支持该事件类型的ApplicationListener:从defaultRetriever 维护的applicationListeners和applicationListenerBeans找到适用类型的ApplicationListener对象。最后按照Order优先级排序,并返回。

java复制代码// 对于listenerBean来说,需要从容器中获取对应的Bean对象Class listenerType = beanFactory.getType(listenerBeanName);if (listenerType == null || supportsEvent(listenerType, eventType)) { ApplicationListener listener = beanFactory.getBean(listenerBeanName, ApplicationListener.class); if (!allListeners.contains(listener) && supportsEvent(listener, eventType, sourceType)) { allListeners.add(listener); }}

为了避免每次查询遍历一遍,使用retrieverCache作为缓存,这样对于相同的事件类型,可以直接返回对应的ApplicationListener列表。这也是为什么每次Listener和ListenerBean新增或移除,都要清空retrieverCache。

java复制代码protected Collection> getApplicationListeners( ApplicationEvent event, ResolvableType eventType) { Object source = event.getSource; Class sourceType = (source != null ? source.getClass : null); ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType); // 直接查询缓存 ListenerRetriever retriever = this.retrieverCache.get(cacheKey); if (retriever != null) { return retriever.getApplicationListeners; } if (this.beanClassLoader == null || (ClassUtils.isCacheSafe(event.getClass, this.beanClassLoader) && (sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) { synchronized (this.retrievalMutex) { retriever = this.retrieverCache.get(cacheKey); if (retriever != null) { return retriever.getApplicationListeners; } retriever = new ListenerRetriever(true); Collection> listeners = retrieveApplicationListeners(eventType, sourceType, retriever); this.retrieverCache.put(cacheKey, retriever); return listeners; } } else { // 不能安全使用缓存 return retrieveApplicationListeners(eventType, sourceType, null); }}

@EventListener & EventListenerMethodProcessor

AnnotationConfigUtils给容器提供了一些默认的注解处理器。包括EventListenerMethodProcessor方法注解处理器和默认的EventListenerFactory实现。

java复制代码// AnnotationConfigUtils#registerAnnotationConfigProcessorsif (!registry.containsBeanDefinition(EVENT_LISTENER_PROCESSOR_BEAN_NAME)) { RootBeanDefinition def = new RootBeanDefinition(EventListenerMethodProcessor.class); def.setSource(source); beanDefs.add(registerPostProcessor(registry, def, EVENT_LISTENER_PROCESSOR_BEAN_NAME));}if (!registry.containsBeanDefinition(EVENT_LISTENER_FACTORY_BEAN_NAME)) { RootBeanDefinition def = new RootBeanDefinition(DefaultEventListenerFactory.class); def.setSource(source); beanDefs.add(registerPostProcessor(registry, def, EVENT_LISTENER_FACTORY_BEAN_NAME));}

EventListenerMethodProcessor做了以下几件事:

从Bean容器中获取所有EventListenerFactory接口的实现(order排序)

在容器初始化单例对象结束后,遍历所有容器中bean的类,通过反射解析出该类中@EventListener注解的方法

将这些方法通过适合的EventListenerFactory调用

createApplicationListener 包装成ApplicationListener对象,并注册到ApplicationContext

java复制代码// 1. 从Bean容器中获取所有EventListenerFactory接口的实现@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) { this.beanFactory = beanFactory; Map beans = beanFactory.getBeansOfType(EventListenerFactory.class, false, false); List factories = new ArrayList<>(beans.values); AnnotationAwareOrderComparator.sort(factories); this.eventListenerFactories = factories;}// 2. 在容器初始化单例对象结束后,遍历所有容器中bean的类@Overridepublic void afterSingletonsInstantiated { ConfigurableListableBeanFactory beanFactory = this.beanFactory; String[] beanNames = beanFactory.getBeanNamesForType(Object.class); for (String beanName : beanNames) { Class type = AutoProxyUtils.determineTargetClass(beanFactory, beanName); if (type != null) { processBean(beanName, type); } }}private void processBean(final String beanName, final Class targetType) { if (!this.nonAnnotatedClasses.contains(targetType)) { // 2.1 通过反射解析出该类中@EventListener注解的方法 Map annotatedMethods = MethodIntrospector.selectMethods(targetType, (MethodIntrospector.MetadataLookup) method -> AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class)); if (CollectionUtils.isEmpty(annotatedMethods)) { this.nonAnnotatedClasses.add(targetType); } else { ConfigurableApplicationContext context = this.applicationContext; List factories = this.eventListenerFactories; for (Method method : annotatedMethods.keySet) { for (EventListenerFactory factory : factories) { // 3. 将这些方法通过适合的EventListenerFactory调用createApplicationListener // 包装成ApplicationListener对象 if (factory.supportsMethod(method)) { Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName)); ApplicationListener applicationListener = factory.createApplicationListener(beanName, targetType, methodToUse); if (applicationListener instanceof ApplicationListenerMethodAdapter) { ((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator); } // 3.1 注册到ApplicationContext context.addApplicationListener(applicationListener); break; } } } } }}

EventListenerFactory

DefaultEventListenerFactory是默认的EventListenerFactory实现,支持所有方法。最后将方法包装为ApplicationListenerMethodAdapter对象。

java复制代码public class DefaultEventListenerFactory implements EventListenerFactory, Ordered { @Override public int getOrder { return LOWEST_PRECEDENCE; } @Override public boolean supportsMethod(Method method) { return true; } @Override public ApplicationListener createApplicationListener(String beanName, Class type, Method method) { return new ApplicationListenerMethodAdapter(beanName, type, method); }}

ApplicationListenerMethodAdatper

ApplicationListenerMethodAdapter使用了适配器的设计模式

从方法的@EventListener注解(或方法参数)中获取需要监听的事件类型,以及条件表达式,从方法的@Order注解中获取优先级信息。

supportsEventType 方法提供事件类型判断

onApplicationEvent 方法收到事件对象时:

通过方法支持的参数(是否无参)以及事件对象是否为

PayloadApplicationEvent 类型解析出方法调用需要的参数信息

根据注解的条件表达式,判断是否可以执行方法调用

执行方法调用

将方法调用的结果作为事件对象发布出去

java复制代码public void processEvent(ApplicationEvent event) { // 通过事件对象解析方法参数 Object[] args = resolveArguments(event); // 条件表达式判断 if (shouldHandle(event, args)) { // 反射执行 Object result = doInvoke(args); if (result != null) { // 将结果作为事件发布 handleResult(result); } }}

总结

Spring Event使用ApplicationEventPublisher发布事件,通过实现ApplicationListener监听事件。这种模式可以解耦复杂的业务依赖。用@EventListener或@TransactionalEventListener注解处理函数,可以提供更复杂的条件过滤,或者事务状态感知,支持业务实现更复杂的事件处理。

从源码实现方面,AbstractApplicationContext在refresh过程中初始化ApplicationEventMulticaster(SimpleApplicationEventMulticaster)和ApplicationListener,并向ApplicationEventMulticaster注册监听器。这样在发布事件后,ApplicationEventMulticaster能够找到类型匹配的ApplicationListener,并执行调用。

对于@EventListener或@TransactionalEventListener,则依赖方法处理器EventListenerMethodProcessor遍历所有bean并找出被注解的方法,用对应的EventListenerFactory(DefaultEventListenerFactory或者TransactionalEventListenerFactory)将该方法包装为ApplicationListener。对于@TransactionalEventListener来说,需要额外通过事务同步器感知事务状态变动。

作者:喜马Tech 晏子赢

转载此文是出于传递更多信息目的。若来源标注错误或侵犯了您的合法权益,请与本站联系,我们将及时更正、删除、谢谢。
https://www.414w.com/read/414994.html
0
最新回复(0)