0%

spring事件监听机制

spring事件监听机制

介绍

spring事件监听机制实际上就是一个典型的观察者模式,在观察者模式的基础之上进行的抽象和处理。使得开发者可以根据自己的业务特点依附于spring容器进行事件的注册、发布、处理。

简单使用

1.创建一个类继承于顶层事件类ApplicationEvent

1
2
3
4
5
6
7
8
9
public class SendNotificationEvent extends ApplicationEvent {

private SendNotificationVo sendNotificationVo;

public SendNotificationEvent(ApplicationContext source, SendNotificationVo sendNotificationVo) {
super(source);
this.sendNotificationVo = sendNotificationVo;
}
}

2.监听到对应的事件后的业务处理。

1
2
3
4
5
6
7
8
9
10
11
@Component
@Slf4j
public class EventListenerHandler {

@EventListener
public void sendNotificationEventListener(SendNotificationEvent event) {
SendNotificationVo sendNotificationVo = event.getSendNotificationVo();
// 具体的监听到事件之后的业务处理。。
}
}

3.调用点发送具体的事件。

1
2
3
4
5
6
7
8
public static void main(String[] args) {
SendNotificationVo sendNotificationVo = SendNotificationVo.builder()
.id("123")
.name("tom")
.status("200").build();
SendNotificationEvent event = new SendNotificationEvent(springApplicationUtil.applicationContext, sendNotificationVo);
springApplicationUtil.applicationContext.publishEvent(event);
}

注意:以上就是最简单的spring事件监听的使用。在具体的应用场景中,并不会这么简单的使用,因为若在业务逻辑上需要解耦,大部分还是希望是异步的方式进行事件的处理,然而在默认的情况下,这种模式是同步机制,也就是说待到具体的事件监听处理完成之后,才会继续执行调用点的业务逻辑。

异步方式

1.广播器异步

​ 在spring的事件监听机制中已经考虑到异步的情况,所以在事件发送器发送事件时,会判断是否存在广播器,当存在广播器时,会将具体的监听执行逻辑转移到广播器对应的线程池中。来跟踪一下源码。

1
2
3
4
5
6
7
8
9
10
11
@FunctionalInterface
public interface ApplicationEventPublisher {

default void publishEvent(ApplicationEvent event) {
publishEvent((Object) event);
}

// 具体子类实现接口
void publishEvent(Object event);

}

具体的实现在AbstractApplicationContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
public void publishEvent(Object event) {
publishEvent(event, null);
}

protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");

// Decorate event as an ApplicationEvent if necessary
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
}
else {
applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
}
}

// Multicast right now if possible - or lazily once the multicaster is initialized
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
// 获取广播器, 并调用广播器对应的发送事件处理
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}

// Publish event via parent context as well...
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
}

默认仅有一个广播器的实现SimpleApplicationEventMulticaster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
// 获取对应的广播器线程池
Executor executor = getTaskExecutor();
// 获取这个event对应类型的所有监听器
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
// 若是配置了线程池, 则将监听任务转移到线程池中执行
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}else {
// 若是没有配置线程池, 执行执行监听任务, 所以在默认情况下与具体的业务逻辑是同步执行。
invokeListener(listener, event);
}
}
}

具体事件监听的执行逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
// 具体执行监听逻辑
doInvokeListener(listener, event);
}catch (Throwable err) {
errorHandler.handleError(err);
}
}else {
doInvokeListener(listener, event);
}
}

private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
// 实际上就是ApplicationListener的onApplicationEvent方法
listener.onApplicationEvent(event);
}catch (ClassCastException ex) {
String msg = ex.getMessage();
if (msg == null || matchesClassCastMessage(msg, event.getClass())) {
Log logger = LogFactory.getLog(getClass());
if (logger.isTraceEnabled()) {
logger.trace("Non-matching event type for listener: " + listener, ex);
}
}else {
throw ex;
}
}
}

刚刚上面介绍的最简单的使用方式中采用的@EventListener的方式来标记监听器的位置,实际上在初始化这个bean对象时,扫描到@EventListener后会将这个对应的方式转化为ApplicationListenerMethodAdapter类,该类中包含了bean名称、类名称、监听处理method等等,待到接收到事件时,通过反射调用对应的监听处理方法。

2.@Async注解异步

​ 虽然在事件发送器中内置了广播器线程池,但是若不进行配置,则它还是同步的方式执行,在它同步执行的基础上,若是利用spring的异步机制,也可以达到异步的效果。

1
2
3
4
5
6
7
8
9
10
@Component
@Slf4j
public class EventListenerHandler {
@Async
@EventListener
public void sendNotificationEventListener(SendNotificationEvent event) {
SendNotificationVo sendNotificationVo = event.getSendNotificationVo();
// 具体的监听到事件之后的业务处理。。
}
}

在这种情况下,在spring容器初始化时,扫描到这个bean对象并进行初始化时,会为这个bean创建一个代理类,由这个代理类来执行相应的异步逻辑。

事务

以上看似已经解决的异步的问题,但是在实际的使用过程中又发现如果事件发送点存在事务管理,就会导致事件中获取不到事件发送点的某些数据。(由于事件监听处理触发时,事件发送点还未提交事务。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 伪代码, 在事件监听的处理中, 通过id=123可能存在获取不到这条数据的情况
*
* @Author: xiaocainiaoya
* @Date: 2021/09/12 13:54:31
* @return:
**/
@Transactional
public void send() {
SendNotificationVo sendNotificationVo = SendNotificationVo.builder()
.id("123")
.name("tom")
.status("200").build();
mapper.insert(sendNotificationVo);
SendNotificationEvent event = new SendNotificationEvent(springApplicationUtil.applicationContext, sendNotificationVo);
springApplicationUtil.applicationContext.publishEvent(event);
}

但是、但是、但是这种情况spring也考虑到了,spring监听机制中通过使用@TransactionalEventListener

来解决这个问题。@TransactionalEventListener它的元注解为@EventListener,所以本质上也是个@EventListener注解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
// 元注解
@EventListener
public @interface TransactionalEventListener {

/**
* 事件触发阶段: 比如事务提交之前、事务提交之后等, 默认是在事务提交之后
*/
TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;

/**
* 若调用点无事务管理也触发, 默认情况下若调用点无事务接管, 该监听处理不会触发
*/
boolean fallbackExecution() default false;

/**
* Alias for {@link #classes}.
*/
@AliasFor(annotation = EventListener.class, attribute = "classes")
Class<?>[] value() default {};

/**
* The event classes that this listener handles.
*/
@AliasFor(annotation = EventListener.class, attribute = "classes")
Class<?>[] classes() default {};

String condition() default "";

}

刚刚上面说到在spring扫描到对应的监听器处理bean时,会根据方法上标记的注解将监听器转换为对应的处理类。根据不同的两个注解@TransactionalEventListenerEventListener对应两个不同的生成监听类工厂DefaultEventListenerFactoryTransactionalEventListenerFactory,由它们来创建具体的监听处理类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private void processBean(final String beanName, final Class<?> targetType) {
if (!this.nonAnnotatedClasses.contains(targetType) && AnnotationUtils.isCandidateClass(targetType, EventListener.class) && !isSpringContainerClass(targetType)) {
Map<Method, EventListener> annotatedMethods = null;
try {
// 从bean的class对象中找出含有@EventListener注解的方法, 存在Map<Method, EventListener>中
// @TransactionListener方法也会被匹配, 因为它的元注解是@EventListener
annotatedMethods = MethodIntrospector.selectMethods(targetType,
(MethodIntrospector.MetadataLookup<EventListener>) method ->
AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class));
}catch (Throwable ex) {
// An unresolvable type in a method signature, probably from a lazy bean - let's ignore it.
if (logger.isDebugEnabled()) {
logger.debug("Could not resolve methods for bean with name '" + beanName + "'", ex);
}
}

if (CollectionUtils.isEmpty(annotatedMethods)) {
this.nonAnnotatedClasses.add(targetType);
if (logger.isTraceEnabled()) {
logger.trace("No @EventListener annotations found on bean class: " + targetType.getName());
}
}else {
// Non-empty set of methods
ConfigurableApplicationContext context = this.applicationContext;
Assert.state(context != null, "No ApplicationContext set");
List<EventListenerFactory> factories = this.eventListenerFactories;
Assert.state(factories != null, "EventListenerFactory List not initialized");
for (Method method : annotatedMethods.keySet()) {
// 获取监听工厂, 这里有两个工厂:DefaultEventListenerFactory和TransactionalEventListenerFactory
for (EventListenerFactory factory : factories) {
// 判断这个被标记的方法适配哪个工厂
if (factory.supportsMethod(method)) {
Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
// 使用工厂创建对应的监听器对象
ApplicationListener<?> applicationListener = factory.createApplicationListener(beanName, targetType, methodToUse);
if (applicationListener instanceof ApplicationListenerMethodAdapter) {
((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
}
context.addApplicationListener(applicationListener);
break;
}
}
}
if (logger.isDebugEnabled()) {
logger.debug(annotatedMethods.size() + " @EventListener methods processed on bean '" + beanName + "': " + annotatedMethods);
}
}
}
}

而这两个工厂生成出来的监听类,实际上是两个适配器,ApplicationListenerMethodAdapterApplicationListenerMethodTransactionalAdapter,由这两个适配器来执行相应的处理逻辑。这里要感叹下spring设计的精妙,一环扣一环,扩展性极强。

这里分析下ApplicationListenerMethodTransactionalAdapter中对应的监听触发方法onApplicationEvent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive() && TransactionSynchronizationManager.isActualTransactionActive()) {
// publish事件时: 创建一个TransactionSynchronization对象, 这个对象持有event
// 创建TransactionSynchronizationEventAdapter
TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
// 注册到事务管理器中
TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
}else if (this.annotation.fallbackExecution()) {
if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
}
// 反射调用进行事件的处理
processEvent(event);
}else {
// No transactional event execution at all
if (logger.isDebugEnabled()) {
logger.debug("No transaction is active - skipping " + event);
}
}
}

本以为采用这种方式之后,就解决了对应的异步+调用点事务的问题。在测试中发现:若采用广播器实现异步,极大可能获取不到调用点事务内数据;而采用@Async实现异步百分百可以获取到调用点事务内数据。

简单跟踪发现:

  • 广播器方式实现异步,是将onApplicationEvent方法的触发丢入线程池。
  • @Async方式实现异步,走下方else逻辑,在事件发送器中走同步逻辑,是直接执行onApplicationEvent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
Executor executor = getTaskExecutor();
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null) {
// invokeListener()返回最后的逻辑是去调用ApplicationListener.onApplicationEvent()
executor.execute(() -> invokeListener(listener, event));
}
else {
invokeListener(listener, event);
}
}
}

这里的是否对onApplicationEvent方法执行执行起到了关键性的作用,因为在事务监听处理器适配器中会判断是否是否存在事务。第一种情况,由线程池内线程来执行该方法,这时事务是绑定在原线程上,所以会导致这个判断结果为false。第二种情况,由事件发送线程执行该方法,这时与事务在同一线程,则这个判断的结果为true,将对应的事件处理方法注册到事务管理器中,待到执行改事件监听处理方法时,是异步进行处理的。

1
2
3
4
5
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive() && TransactionSynchronizationManager.isActualTransactionActive()) {
// 省略以下代码
}

总结

​ 整体使用下来,发现其中的道道还是很多的,这需要对所有的组合情况、问题情况、原理都掌握的情况下,否则随意组合,可能在某一场景下能达到需要的效果,但是就像是埋下了定时炸弹。当然了spring的事件监听机制毕竟只是基于内存,若对应的生产环境并没有升级停机钩子处理,或者是金丝雀升级等方式,需停机升级,有可能会导致部分监听未执行的情况,所以建议生产环境还是通过一些mq组件进行发布监听事件的处理。

-------------本文结束感谢您的阅读-------------