前言
最近公司内部准备搞消息驱动,即在订单系统上使用消息/事件驱动的设计,这里研究了一下进程内的 事件驱动设计,主要分析了业内常用的Spring Event
和EventBus
,本篇博文目的如下:
- 熟悉事件驱动的设计思想
- 会在项目中使用事件驱动(spring event和 eventbus)
- spring event和 eventbus 的异同点
QucikStart
Spring Event
三要素:
Event
(事件)Publisher
(发布者)Listener
(监听者)
举例场景
用户下订单,推送发短信通知,发微信通知。
创建一个 Event
直接继承ApplicationEvent
即可。
示例代码:
public class CreateOrderEvent extends ApplicationEvent {
public CreateOrderEvent(Object source) {
super(source);
}
public Order getOrder() {
return (Order) this.source;
}
}
补充:这里事件也可以不继承ApplicationEvent
,因为发布接口有提供void publishEvent(Object event);
创建一个 Publisher
使用ApplicationEventPublisher
发布即可。
示例代码:
@Slf4j
@Component
public class EventPublisher {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
public void publishCreateOrderEvent(CreateOrderEvent createOrderEvent) {
log.info("发布了一个[订单创建]事件:{}", createOrderEvent);
applicationEventPublisher.publishEvent(createOrderEvent);
}
}
监听事件
监听者的创建可以使用两种方式:
- 实现
ApplicationListener
接口 - 基于注解式的监听(推荐,要求 Spring 4.2 + )
使用实现ApplicationListener
接口的方式示例:
@Slf4j
@Component
public class DuanXinNoticeListener implements ApplicationListener<CreateOrderEvent> {
@Override
public void onApplicationEvent(CreateOrderEvent event) {
log.info("实现接口方式:收到[创建订单]事件。[短信通知]:亲爱的{},您的订单[{}]已被创建", event.getOrder().getUserName(), event.getOrder().getOrderName());
}
}
这种方式不太优雅,每次都要实现一次接口不太爽。那么推荐使用使用注解式监听器。
注解式监听只需要在监听方法逻辑上使用注解@EventListener
即可,而且注解支持spel表达式。
注解式示例:
@Slf4j
@Component
public class CreateOrderListener {
@EventListener(condition = "#createOrderEvent.order.status == 1")
public void processCreateOrderEvent(CreateOrderEvent createOrderEvent) {
log.info("注解式 spring event 收到消息,订单名称:{} ; 订单状态为:{} ; 开始处理相应的事件。", createOrderEvent.getOrder().getOrderName(), createOrderEvent.getOrder().getStatus());
}
@EventListener(condition = "#createOrderEvent.order.status == 2")
public void processCreateOrderEvent2(CreateOrderEvent createOrderEvent) {
log.info("注解式 spring event 收到消息,订单名称:{} ; 订单状态为:{} ; 开始处理相应的事件。", createOrderEvent.getOrder().getOrderName(), createOrderEvent.getOrder().getStatus());
}
}
使用事件发布
事件在控制器里使用:
@Slf4j
@RestController
@RequestMapping("/event")
public class EventDemoController {
@Autowired
private EventPublisher createOrderEventPublisher;
@PostMapping("/spring/createOrder")
public String createOrder(Order order) {
createOrderEventPublisher.publishCreateOrderEvent(new CreateOrderEvent(order));
return "ok";
}
}
测试创建订单事件
发送请求:
curl -X POST \
http://localhost:9090/event/spring/createOrder \
-H 'cache-control: no-cache' \
-H 'content-type: application/x-www-form-urlencoded' \
-H 'postman-token: a632443a-b7b2-40df-6bc3-3881babd15e3' \
-d 'id=1&orderName=53%E5%BA%A62018%E9%A3%9E%E5%A4%A9%E8%8C%85%E5%8F%B0&status=1&userName=%E5%BC%A0%E4%B8%89'
可以看到控制台打印:
2019-02-21 10:53:20.750 INFO 24168 --- [nio-9090-exec-3] c.gemantic.wealth.spring.EventPublisher : 发布了一个[订单创建]事件:CreateOrderEvent(order=Order(id=1, orderName=53度2018飞天茅台, status=1, userName=张三))
2019-02-21 10:53:20.750 INFO 24168 --- [nio-9090-exec-3] c.g.wealth.spring.CreateOrderListener : 注解式 spring event 收到消息,订单名称:53度2018飞天茅台 ; 订单状态为:1 ; 开始处理相应的事件。
2019-02-21 10:53:20.750 INFO 24168 --- [nio-9090-exec-3] c.g.wealth.spring.DuanXinNoticeListener : 实现接口方式:收到[创建订单]事件。[短信通知]:亲爱的张三,您的订单[53度2018飞天茅台]已被创建
2019-02-21 10:53:20.750 INFO 24168 --- [nio-9090-exec-3] c.g.wealth.spring.WeiXinNoticeListener : 实现接口方式:收到[创建订单]事件。[微信通知]:亲爱的张三,您的订单[53度2018飞天茅台]已被创建
事件已被执行,把status
改为2:
2019-02-21 10:55:51.951 INFO 24168 --- [nio-9090-exec-5] c.gemantic.wealth.spring.EventPublisher : 发布了一个[订单创建]事件:CreateOrderEvent(order=Order(id=1, orderName=53度2018飞天茅台, status=2, userName=张三))
2019-02-21 10:55:51.951 INFO 24168 --- [nio-9090-exec-5] c.g.wealth.spring.CreateOrderListener : 注解式 spring event 收到消息,订单名称:53度2018飞天茅台 ; 订单状态为:2 ; 开始处理相应的事件。
2019-02-21 10:55:51.951 INFO 24168 --- [nio-9090-exec-5] c.g.wealth.spring.DuanXinNoticeListener : 实现接口方式:收到[创建订单]事件。[短信通知]:亲爱的张三,您的订单[53度2018飞天茅台]已被创建
2019-02-21 10:55:51.951 INFO 24168 --- [nio-9090-exec-5] c.g.wealth.spring.WeiXinNoticeListener : 实现接口方式:收到[创建订单]事件。[微信通知]:亲爱的张三,您的订单[53度2018飞天茅台]已被创建
可见可以很方便的根据条件执行监听。
补充1: 使用异步事件处理
可以在监听方法上使用注解@Async
,前提是需要spring应用启动异步功能:@EnableAsync
补充2: TransactionalEventListener实现事务监控
我们知道,比如如上“发送短信”,“发送微信”等通知是需要在“订单入库”的基础上才能执行,也就是说“订单入库”完成,才能处理执行相应的监听器的逻辑。
使用@TransactionalEventListener
即可完成如上控制,可参考: spring官方文档 Transaction-bound Events
注意一点,官方文档中有提出:
If no transaction is running, the listener is not invoked at all, since we cannot honor the required semantics. You can, however, override that behavior by setting the fallbackExecution attribute of the annotation to true.
也就是说,实现事务控制必须开启事务
并且处于一个事务中,否者该监听器不会被调用。(不过可以设置fallbackExecution=true
)
前提条件:
- 开启事务管理:
@EnableTransactionManagement
- 配置事务管理器:
PlatformTransactionManager
(这一步@EnableTransactionManagement
已默认实现,如须按需配置可手动配置) - 使用
@Transactional
处理一个事务
示例代码如下(前提已开启事务):
订单服务-“订单入库”:
@Slf4j
@Service
public class OrderService {
@Autowired
private EventPublisher createOrderEventPublisher;
/**
* 插入订单表操作
*/
@Transactional(rollbackFor = Exception.class)
public void insert(Order order) {
log.info("[订单入库] start");
createOrderEventPublisher.publishCreateOrderEvent(new CreateOrderEvent(order));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("[订单入库] end");
}
}
两个新的监听器,一个不是事务的,一个是事务的做对比:
@EventListener(condition = "#createOrderEvent.order.status == 3")
public void processCreateOrderEvent3(CreateOrderEvent createOrderEvent) {
log.info("[EventListener] 注解式 spring event 收到消息,订单名称:{} ; 订单状态为:{} ; 开始处理相应的事件。", createOrderEvent.getOrder().getOrderName(), createOrderEvent.getOrder().getStatus());
}
@TransactionalEventListener(condition = "#createOrderEvent.order.status == 3", phase = TransactionPhase.AFTER_COMMIT)
public void processCreateOrderEvent4(CreateOrderEvent createOrderEvent) {
log.info("[TransactionalEventListener] 注解式 spring event 收到消息,订单名称:{} ; 订单状态为:{} ; 开始处理相应的事件。", createOrderEvent.getOrder().getOrderName(), createOrderEvent.getOrder().getStatus());
}
控制器里的事务调用请求处理:
@PostMapping("/spring/createOrder_transaction")
public String createOrder_transaction(Order order) {
orderService.insert(order);
return "ok";
}
开启服务发请求测试:
curl -X POST \
http://localhost:9090/event/spring/createOrder_transaction \
-H 'cache-control: no-cache' \
-H 'content-type: application/x-www-form-urlencoded' \
-H 'postman-token: 1e9d2357-ba51-6aa6-156b-6fe2559cdb06' \
-d 'id=1&orderName=53%E5%BA%A62018%E9%A3%9E%E5%A4%A9%E8%8C%85%E5%8F%B0&status=3&userName=%E5%BC%A0%E4%B8%89'
结果如下:
2019-02-21 14:07:31.467 INFO 24496 --- [nio-9090-exec-1] c.gemantic.wealth.service.OrderService : [订单入库] start
2019-02-21 14:07:31.468 INFO 24496 --- [nio-9090-exec-1] c.gemantic.wealth.spring.EventPublisher : 发布了一个[订单创建]事件:CreateOrderEvent(order=Order(id=1, orderName=53度2018飞天茅台, status=3, userName=张三))
2019-02-21 14:07:31.482 INFO 24496 --- [nio-9090-exec-1] c.g.wealth.spring.CreateOrderListener : [EventListener] 注解式 spring event 收到消息,订单名称:53度2018飞天茅台 ; 订单状态为:3 ; 开始处理相应的事件。
2019-02-21 14:07:31.483 INFO 24496 --- [nio-9090-exec-1] c.g.wealth.spring.DuanXinNoticeListener : 实现接口方式:收到[创建订单]事件。[短信通知]:亲爱的张三,您的订单[53度2018飞天茅台]已被创建
2019-02-21 14:07:31.483 INFO 24496 --- [nio-9090-exec-1] c.g.wealth.spring.WeiXinNoticeListener : 实现接口方式:收到[创建订单]事件。[微信通知]:亲爱的张三,您的订单[53度2018飞天茅台]已被创建
2019-02-21 14:07:36.483 INFO 24496 --- [nio-9090-exec-1] c.gemantic.wealth.service.OrderService : [订单入库] end
2019-02-21 14:07:36.484 INFO 24496 --- [nio-9090-exec-1] c.g.wealth.spring.CreateOrderListener : [TransactionalEventListener] 注解式 spring event 收到消息,订单名称:53度2018飞天茅台 ; 订单状态为:3 ; 开始处理相应的事件。
请求大约5秒后返回,可以看到控制台,EventListener
是直接处理的,TransactionalEventListener
是在订单insert
方法调用后再处理。这是因为TransactionalEventListener
默认的处理是事务commit
之后处理的,这里可以改注解的phase
属性。参考枚举类:TransactionPhase
。
public enum TransactionPhase {
BEFORE_COMMIT,
AFTER_COMMIT,
AFTER_ROLLBACK,
AFTER_COMPLETION
}
补充3: 事件的父类监听
如果事件继承了某一父类,此父类也有监听,则每次发布事件,父类子类的监听器都会执行。
Guava EventBus
Google的进程内的发布订阅模式的实现,轻量级消息系统。 可参考:
那么场景依然是订单与通知场景,下面示例代码为集成在spring中的示例。
依赖guava包
maven引入:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>22.0</version>
</dependency>
创建一个 Event
普通类即可以。
@Data
@AllArgsConstructor
public class CreateOrderEvent {
private Order order;
}
创建一个 Publisher
发布者即 EventBus
类,因为EventBus
不是单例的,我们使用中又想用单例,创建一个事件管理器类。
/**
* 事件管理器:统一管理所有事件
*/
public class EventBusManager {
/**
* 创建订单
*/
public final static EventBus CREATE_ORDER = new EventBus();
}
监听事件
可以优雅的使用@Subscribe
表示监听某一事件,这个注解没有其他属性。
除此之外,监听器需要手动注册到EventBus
中类如:EventBus.register(listener)
。
方便起见,我们可以在spring
容器初始化时注册监听器,并抽象之。
/**
* 抽象的监听器
*/
public abstract class AbstractBusListener {
/**
* 统一在spring启动时注册事件
*/
@PostConstruct
public void init() {
registerEventBus();
}
private void registerEventBus() {
getEventBus().register(this);
}
protected abstract EventBus getEventBus();
}
@Slf4j
@Component
public class CreateOrderBusListener extends AbstractBusListener {
/**
* 订单创建事件:短信通知
*/
@Subscribe
public void onCreateOrderEvent2DuanxinNotice(CreateOrderEvent event) {
log.info("收到[创建订单]事件。[短信通知]:亲爱的{},您的订单[{}]已被创建", event.getOrder().getUserName(), event.getOrder().getOrderName());
}
/**
* 订单创建事件:微信通知
*/
@Subscribe
public void onCreateOrderEvent2WeixinNotice(CreateOrderEvent event) {
log.info("收到[创建订单]事件。[微信通知]:亲爱的{},您的订单[{}]已被创建", event.getOrder().getUserName(), event.getOrder().getOrderName());
}
/**
* 注册的为订单创建事件
*/
@Override
protected EventBus getEventBus() {
return EventBusManager.CREATE_ORDER;
}
}
发布事件
发布事件也是通过EventBus
发布,通过其post(event)
方法发布。
示例:
@PostMapping("/eventbus/createOrder")
public String createOrderByEventBus(Order order) {
EventBusManager.CREATE_ORDER.post(new com.gemantic.wealth.eventbus.CreateOrderEvent(order));
return "ok";
}
测试创建订单事件
发送请求:
curl -X POST \
http://localhost:9090/event/eventbus/createOrder \
-H 'cache-control: no-cache' \
-H 'content-type: application/x-www-form-urlencoded' \
-H 'postman-token: 6b5255d5-2b1d-e9c1-7ef3-a5678f7a73ad' \
-d 'id=1&orderName=53%E5%BA%A62018%E9%A3%9E%E5%A4%A9%E8%8C%85%E5%8F%B0&status=1&userName=%E5%BC%A0%E4%B8%89'
看控制台打印:
2019-02-21 17:34:04.276 INFO 24556 --- [nio-9090-exec-1] c.g.w.eventbus.CreateOrderBusListener : 收到[创建订单]事件。[微信通知]:亲爱的张三,您的订单[53度2018飞天茅台]已被创建
2019-02-21 17:34:04.276 INFO 24556 --- [nio-9090-exec-1] c.g.w.eventbus.CreateOrderBusListener : 收到[创建订单]事件。[短信通知]:亲爱的张三,您的订单[53度2018飞天茅台]已被创建
补充1:使用异步事件处理
EventBus
实现上改为AsyncEventBus
即可,构造器内需要再配一个线程池。举例:
public final static EventBus CREATE_ORDER = new AsyncEventBus(Executors.newFixedThreadPool(5));
补充2:DeadEvent
介绍:
Wraps an event that was posted, but which had no subscribers and thus could not be delivered. Registering a DeadEvent subscriber is useful for debugging or logging, as it can detect misconfigurations in a system’s event distribution.
所有没被监听的事件都会被包装成一个DeadEvent
。这种情况,只需要提供一个监听DeadEvent
事件的监听器即可统一处理
补充3: 事件的父类监听
如果事件继承了某一父类,此父类也有监听,则每次发布事件,父类子类的监听器都会执行。
补充4: @AllowConcurrentEvents
描述:
Marks an event subscriber method as being thread-safe. This annotation indicates that EventBus may invoke the event subscriber simultaneously from multiple threads. This does not mark the method, and so should be used in combination with Subscribe.
创建监听器时会根据此注解判断创建普通监听者还是同步的监听者:
class Subscriber {
/**
* Creates a {@code Subscriber} for {@code method} on {@code listener}.
*/
static Subscriber create(EventBus bus, Object listener, Method method) {
return isDeclaredThreadSafe(method)
? new Subscriber(bus, listener, method)
: new SynchronizedSubscriber(bus, listener, method);
}
private static boolean isDeclaredThreadSafe(Method method) {
return method.getAnnotation(AllowConcurrentEvents.class) != null;
}
...
}
同步的监听者则会加锁同步调用订阅方法:
@VisibleForTesting
static final class SynchronizedSubscriber extends Subscriber {
private SynchronizedSubscriber(EventBus bus, Object target, Method method) {
super(bus, target, method);
}
@Override
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
synchronized (this) {
super.invokeSubscriberMethod(event);
}
}
}
加锁会有一定的开销,所以如果是同步的EventBus
监听者或者是你确认线程安全的AsyncEventBus
监听者,最好标注注解@AllowConcurrentEvents
事件处理异常
– | Spring Event | Guava EventBus |
---|---|---|
同步 | 会对其他监听器有影响 | 不影响其他监听器的处理 |
异步 | 不影响其他监听器的处理 | 不影响其他监听器的处理 |
针对同步的情况单独说下。
EventBus
事件处理逻辑,无论同步异步都是用线程执行,区别在于同步的情况使用的是同一个线程执行。
final void dispatchEvent(final Object event) {
this.executor.execute(new Runnable() {
public void run() {
try {
Subscriber.this.invokeSubscriberMethod(event);
} catch (InvocationTargetException var2) {
Subscriber.this.bus.handleSubscriberException(var2.getCause(), Subscriber.this.context(event));
}
}
});
}
this.executor是:
public static Executor directExecutor() {
return MoreExecutors.DirectExecutor.INSTANCE;
}
可以看到EventBus
把异常都统一catch了,异常不上抛,别的监听器依然正常处理。
Spring
Spring调用的时候的异常都是往外抛的:
protected Object doInvoke(Object... args) {
Object bean = getTargetBean();
ReflectionUtils.makeAccessible(this.bridgedMethod);
try {
return this.bridgedMethod.invoke(bean, args);
}
catch (IllegalArgumentException ex) {
assertTargetBean(this.bridgedMethod, bean, args);
throw new IllegalStateException(getInvocationErrorMessage(bean, ex.getMessage(), args), ex);
}
catch (IllegalAccessException ex) {
throw new IllegalStateException(getInvocationErrorMessage(bean, ex.getMessage(), args), ex);
}
catch (InvocationTargetException ex) {
// Throw underlying exception
Throwable targetException = ex.getTargetException();
if (targetException instanceof RuntimeException) {
throw (RuntimeException) targetException;
}
else {
String msg = getInvocationErrorMessage(bean, "Failed to invoke event listener method", args);
throw new UndeclaredThrowableException(targetException, msg);
}
}
}
分别适合的场景
异步情况下,都是多线程处理,无差异。
同步情况下,同一线程处理,由于Spring会抛出异常,适合事件有依赖的情况,而EventBus适合事件互不依赖的场景。
举个栗子:
- 下订单场景一。订单已经创建,那么会发送短信通知,微信通知,此二者之间无依赖,适合EventBus.
- 下订单场景二。如果创建订单也作为事件,那么微信短信通知需依赖订单事件已完成,如果创建订单异常失败则不需要发送通知,适合Spring Event
异步实现的区别
EventBus的异步实现
EventBus的异步实现是依赖于线程池的,在创建的时候确定,实际上是在监听器调用方法的时候采用的异步,同一个EventBus
都是同一种处理方式(同步or异步)。
Spring的异步实现
Spring是通过拦截器的机制,被@Async
的方法都会被AsyncExecutionInterceptor
拦截,然后采用多线程的方式调用:
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
AsyncTaskExecutor executor = this.determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");
} else {
Callable<Object> task = new Callable<Object>() {
public Object call() throws Exception {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future)result).get();
}
} catch (ExecutionException var2) {
AsyncExecutionInterceptor.this.handleError(var2.getCause(), userDeclaredMethod, invocation.getArguments());
} catch (Throwable var3) {
AsyncExecutionInterceptor.this.handleError(var3, userDeclaredMethod, invocation.getArguments());
}
return null;
}
};
// 异步调用
return this.doSubmit(task, executor, invocation.getMethod().getReturnType());
}
}
异步方式小结
综上,还是Spring Event
更加灵活多变,适用场景更多。
Spring Event与 Guava EventBus的比较
– | Spring Event | Guava EventBus |
---|---|---|
Event | 任意对象 | 任意对象 |
Publisher | EventBus | ApplicationEventPublisher |
Subscriber | @EventListener | @Subscribe |
发布方法 | ApplicationEventPublisher#publishEvent | EventBus#post |
注册方法 | spring自动注册 | 手动注册:EventBus#register |
是否支持异步 | 支持。@Async | 支持。AsyncEventBus |
是否支持事务 | 支持。@TransactionalEventListener | 不支持。 |
是否支持条件过滤 | 支持。 | 不支持。 |
是否支持DeadEvent | 不支持。 | 支持。 |
是否支持事件继承 | 支持。 | 支持。 |
同步时是否支持排序 | 支持。 | 不支持。 |
事件异常处理 | 异常上抛 | 捕获异常并不上抛 |
复杂度 | 复杂 | 轻量 |
总结
Spring Event
与Guava EventBus
在设计上略有不同,可根据业务场景按需使用, 总的来说Guava EventBus
比较轻量一些,适合大多数业务场景,而Spring Event
稍微重了,但也开箱可用,与Spring天然一体,多了很多花哨的功能也适合更多的复杂场景。