spring事件监听机制 介绍 spring
事件监听机制实际上就是一个典型的观察者模式,在观察者模式的基础之上进行的抽象和处理。使得开发者可以根据自己的业务特点依附于spring
容器进行事件的注册、发布、处理。
简单使用 1.创建一个类继承于顶层事件类ApplicationEvent
1 2 3 4 5 6 7 8 9 public class SendNotificationEvent extends ApplicationEvent { private SendNotificationVo sendNotificationVo; public SendNotificationEvent (ApplicationContext source, SendNotificationVo sendNotificationVo) { super (source); this .sendNotificationVo = sendNotificationVo; } }
2.监听到对应的事件后的业务处理。
1 2 3 4 5 6 7 8 9 10 11 @Component @Slf4j public class EventListenerHandler { @EventListener public void sendNotificationEventListener (SendNotificationEvent event) { SendNotificationVo sendNotificationVo = event.getSendNotificationVo(); } }
3.调用点发送具体的事件。
1 2 3 4 5 6 7 8 public static void main (String[] args) { SendNotificationVo sendNotificationVo = SendNotificationVo.builder() .id("123" ) .name("tom" ) .status("200" ).build(); SendNotificationEvent event = new SendNotificationEvent(springApplicationUtil.applicationContext, sendNotificationVo); springApplicationUtil.applicationContext.publishEvent(event); }
注意: 以上就是最简单的spring
事件监听的使用。在具体的应用场景中,并不会这么简单的使用,因为若在业务逻辑上需要解耦,大部分还是希望是异步的方式进行事件的处理,然而在默认的情况下,这种模式是同步机制,也就是说待到具体的事件监听处理完成之后,才会继续执行调用点的业务逻辑。
异步方式 1.广播器异步 在spring
的事件监听机制中已经考虑到异步的情况,所以在事件发送器发送事件时,会判断是否存在广播器,当存在广播器时,会将具体的监听执行逻辑转移到广播器对应的线程池中。来跟踪一下源码。
1 2 3 4 5 6 7 8 9 10 11 @FunctionalInterface public interface ApplicationEventPublisher { default void publishEvent (ApplicationEvent event) { publishEvent((Object) event); } void publishEvent (Object event) ; }
具体的实现在AbstractApplicationContext
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Override public void publishEvent (Object event) { publishEvent(event, null ); } protected void publishEvent (Object event, @Nullable ResolvableType eventType) { Assert.notNull(event, "Event must not be null" ); ApplicationEvent applicationEvent; if (event instanceof ApplicationEvent) { applicationEvent = (ApplicationEvent) event; } else { applicationEvent = new PayloadApplicationEvent<>(this , event); if (eventType == null ) { eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType(); } } if (this .earlyApplicationEvents != null ) { this .earlyApplicationEvents.add(applicationEvent); } else { getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); } if (this .parent != null ) { if (this .parent instanceof AbstractApplicationContext) { ((AbstractApplicationContext) this .parent).publishEvent(event, eventType); } else { this .parent.publishEvent(event); } } }
默认仅有一个广播器的实现SimpleApplicationEventMulticaster
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override public void multicastEvent (final ApplicationEvent event, @Nullable ResolvableType eventType) { ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event)); Executor executor = getTaskExecutor(); for (ApplicationListener<?> listener : getApplicationListeners(event, type)) { if (executor != null ) { executor.execute(() -> invokeListener(listener, event)); }else { invokeListener(listener, event); } } }
具体事件监听的执行逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 protected void invokeListener (ApplicationListener<?> listener, ApplicationEvent event) { ErrorHandler errorHandler = getErrorHandler(); if (errorHandler != null ) { try { doInvokeListener(listener, event); }catch (Throwable err) { errorHandler.handleError(err); } }else { doInvokeListener(listener, event); } } private void doInvokeListener (ApplicationListener listener, ApplicationEvent event) { try { listener.onApplicationEvent(event); }catch (ClassCastException ex) { String msg = ex.getMessage(); if (msg == null || matchesClassCastMessage(msg, event.getClass())) { Log logger = LogFactory.getLog(getClass()); if (logger.isTraceEnabled()) { logger.trace("Non-matching event type for listener: " + listener, ex); } }else { throw ex; } } }
刚刚上面介绍的最简单的使用方式中采用的@EventListener
的方式来标记监听器的位置,实际上在初始化这个bean
对象时,扫描到@EventListener
后会将这个对应的方式转化为ApplicationListenerMethodAdapter
类,该类中包含了bean
名称、类名称、监听处理method
等等,待到接收到事件时,通过反射调用对应的监听处理方法。
2.@Async注解异步 虽然在事件发送器中内置了广播器线程池,但是若不进行配置,则它还是同步的方式执行,在它同步执行的基础上,若是利用spring
的异步机制,也可以达到异步的效果。
1 2 3 4 5 6 7 8 9 10 @Component @Slf4j public class EventListenerHandler { @Async @EventListener public void sendNotificationEventListener (SendNotificationEvent event) { SendNotificationVo sendNotificationVo = event.getSendNotificationVo(); } }
在这种情况下,在spring
容器初始化时,扫描到这个bean
对象并进行初始化时,会为这个bean
创建一个代理类,由这个代理类来执行相应的异步逻辑。
事务 以上看似已经解决的异步的问题,但是在实际的使用过程中又发现如果事件发送点存在事务管理,就会导致事件中获取不到事件发送点的某些数据。(由于事件监听处理触发时,事件发送点还未提交事务。)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Transactional public void send () { SendNotificationVo sendNotificationVo = SendNotificationVo.builder() .id("123" ) .name("tom" ) .status("200" ).build(); mapper.insert(sendNotificationVo); SendNotificationEvent event = new SendNotificationEvent(springApplicationUtil.applicationContext, sendNotificationVo); springApplicationUtil.applicationContext.publishEvent(event); }
但是、但是、但是这种情况spring
也考虑到了,spring
监听机制中通过使用@TransactionalEventListener
来解决这个问题。@TransactionalEventListener
它的元注解为@EventListener
,所以本质上也是个@EventListener
注解。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @EventListener public @interface TransactionalEventListener { TransactionPhase phase () default TransactionPhase.AFTER_COMMIT ; boolean fallbackExecution () default false ; @AliasFor(annotation = EventListener.class, attribute = "classes") Class<?>[] value() default {}; @AliasFor(annotation = EventListener.class, attribute = "classes") Class<?>[] classes() default {}; String condition () default "" ; }
刚刚上面说到在spring
扫描到对应的监听器处理bean
时,会根据方法上标记的注解将监听器转换为对应的处理类。根据不同的两个注解@TransactionalEventListener
和EventListener
对应两个不同的生成监听类工厂DefaultEventListenerFactory
和TransactionalEventListenerFactory
,由它们来创建具体的监听处理类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 private void processBean (final String beanName, final Class<?> targetType) { if (!this .nonAnnotatedClasses.contains(targetType) && AnnotationUtils.isCandidateClass(targetType, EventListener.class) && !isSpringContainerClass(targetType)) { Map<Method, EventListener> annotatedMethods = null ; try { annotatedMethods = MethodIntrospector.selectMethods(targetType, (MethodIntrospector.MetadataLookup<EventListener>) method -> AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class)); }catch (Throwable ex) { if (logger.isDebugEnabled()) { logger.debug("Could not resolve methods for bean with name '" + beanName + "'" , ex); } } if (CollectionUtils.isEmpty(annotatedMethods)) { this .nonAnnotatedClasses.add(targetType); if (logger.isTraceEnabled()) { logger.trace("No @EventListener annotations found on bean class: " + targetType.getName()); } }else { ConfigurableApplicationContext context = this .applicationContext; Assert.state(context != null , "No ApplicationContext set" ); List<EventListenerFactory> factories = this .eventListenerFactories; Assert.state(factories != null , "EventListenerFactory List not initialized" ); for (Method method : annotatedMethods.keySet()) { for (EventListenerFactory factory : factories) { if (factory.supportsMethod(method)) { Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName)); ApplicationListener<?> applicationListener = factory.createApplicationListener(beanName, targetType, methodToUse); if (applicationListener instanceof ApplicationListenerMethodAdapter) { ((ApplicationListenerMethodAdapter) applicationListener).init(context, this .evaluator); } context.addApplicationListener(applicationListener); break ; } } } if (logger.isDebugEnabled()) { logger.debug(annotatedMethods.size() + " @EventListener methods processed on bean '" + beanName + "': " + annotatedMethods); } } } }
而这两个工厂生成出来的监听类,实际上是两个适配器,ApplicationListenerMethodAdapter
和ApplicationListenerMethodTransactionalAdapter
,由这两个适配器来执行相应的处理逻辑。这里要感叹下spring
设计的精妙,一环扣一环,扩展性极强。
这里分析下ApplicationListenerMethodTransactionalAdapter
中对应的监听触发方法onApplicationEvent
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override public void onApplicationEvent (ApplicationEvent event) { if (TransactionSynchronizationManager.isSynchronizationActive() && TransactionSynchronizationManager.isActualTransactionActive()) { TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event); TransactionSynchronizationManager.registerSynchronization(transactionSynchronization); }else if (this .annotation.fallbackExecution()) { if (this .annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) { logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase" ); } processEvent(event); }else { if (logger.isDebugEnabled()) { logger.debug("No transaction is active - skipping " + event); } } }
本以为采用这种方式之后,就解决了对应的异步+调用点事务的问题。在测试中发现:若采用广播器实现异步,极大可能获取不到调用点事务内数据;而采用@Async
实现异步百分百可以获取到调用点事务内数据。
简单跟踪发现:
广播器方式实现异步,是将onApplicationEvent
方法的触发丢入线程池。
@Async
方式实现异步,走下方else
逻辑,在事件发送器中走同步逻辑,是直接执行onApplicationEvent
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override public void multicastEvent (final ApplicationEvent event, @Nullable ResolvableType eventType) { ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event)); Executor executor = getTaskExecutor(); for (ApplicationListener<?> listener : getApplicationListeners(event, type)) { if (executor != null ) { executor.execute(() -> invokeListener(listener, event)); } else { invokeListener(listener, event); } } }
这里的是否对onApplicationEvent
方法执行执行起到了关键性的作用,因为在事务监听处理器适配器中会判断是否是否存在事务。第一种情况,由线程池内线程来执行该方法,这时事务是绑定在原线程上,所以会导致这个判断结果为false
。第二种情况,由事件发送线程执行该方法,这时与事务在同一线程,则这个判断的结果为true
,将对应的事件处理方法注册到事务管理器中,待到执行改事件监听处理方法时,是异步进行处理的。
1 2 3 4 5 @Override public void onApplicationEvent (ApplicationEvent event) { if (TransactionSynchronizationManager.isSynchronizationActive() && TransactionSynchronizationManager.isActualTransactionActive()) { }
总结 整体使用下来,发现其中的道道还是很多的,这需要对所有的组合情况、问题情况、原理都掌握的情况下,否则随意组合,可能在某一场景下能达到需要的效果,但是就像是埋下了定时炸弹。当然了spring
的事件监听机制毕竟只是基于内存,若对应的生产环境并没有升级停机钩子处理,或者是金丝雀升级等方式,需停机升级,有可能会导致部分监听未执行的情况,所以建议生产环境还是通过一些mq
组件进行发布监听事件的处理。