Fork me on GitHub

事件驱动SpringEvent与EventBus

前言

最近公司内部准备搞消息驱动,即在订单系统上使用消息/事件驱动的设计,这里研究了一下进程内的 事件驱动设计,主要分析了业内常用的Spring EventEventBus,本篇博文目的如下:

  • 熟悉事件驱动的设计思想
  • 会在项目中使用事件驱动(spring event和 eventbus)
  • spring event和 eventbus 的异同点

QucikStart

Spring Event

spring event官方文档索引

三要素:

  • Event(事件)
  • Publisher(发布者)
  • Listener(监听者)

举例场景

用户下订单,推送发短信通知,发微信通知。

创建一个 Event

直接继承ApplicationEvent即可。

示例代码:

public class CreateOrderEvent extends ApplicationEvent {

    public CreateOrderEvent(Object source) {
        super(source);
    }

    public Order getOrder() {
        return (Order) this.source;
    }
}

补充:这里事件也可以不继承ApplicationEvent,因为发布接口有提供void publishEvent(Object event);

创建一个 Publisher

使用ApplicationEventPublisher发布即可。

示例代码:

@Slf4j
@Component
public class EventPublisher {

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    public void publishCreateOrderEvent(CreateOrderEvent createOrderEvent) {
        log.info("发布了一个[订单创建]事件:{}", createOrderEvent);
        applicationEventPublisher.publishEvent(createOrderEvent);
    }
}

监听事件

监听者的创建可以使用两种方式:

  • 实现ApplicationListener接口
  • 基于注解式的监听(推荐,要求 Spring 4.2 + )

使用实现ApplicationListener接口的方式示例:

@Slf4j
@Component
public class DuanXinNoticeListener implements ApplicationListener<CreateOrderEvent> {
    @Override
    public void onApplicationEvent(CreateOrderEvent event) {
        log.info("实现接口方式:收到[创建订单]事件。[短信通知]:亲爱的{},您的订单[{}]已被创建", event.getOrder().getUserName(), event.getOrder().getOrderName());
    }
}

这种方式不太优雅,每次都要实现一次接口不太爽。那么推荐使用使用注解式监听器。

注解式监听只需要在监听方法逻辑上使用注解@EventListener即可,而且注解支持spel表达式。

注解式示例:

@Slf4j
@Component
public class CreateOrderListener {

    @EventListener(condition = "#createOrderEvent.order.status == 1")
    public void processCreateOrderEvent(CreateOrderEvent createOrderEvent) {
        log.info("注解式 spring event 收到消息,订单名称:{} ; 订单状态为:{} ; 开始处理相应的事件。", createOrderEvent.getOrder().getOrderName(), createOrderEvent.getOrder().getStatus());
    }

    @EventListener(condition = "#createOrderEvent.order.status == 2")
    public void processCreateOrderEvent2(CreateOrderEvent createOrderEvent) {
        log.info("注解式 spring event 收到消息,订单名称:{} ; 订单状态为:{} ; 开始处理相应的事件。", createOrderEvent.getOrder().getOrderName(), createOrderEvent.getOrder().getStatus());
    }
}

使用事件发布

事件在控制器里使用:

@Slf4j
@RestController
@RequestMapping("/event")
public class EventDemoController {

    @Autowired
    private EventPublisher createOrderEventPublisher;

    @PostMapping("/spring/createOrder")
    public String createOrder(Order order) {
        createOrderEventPublisher.publishCreateOrderEvent(new CreateOrderEvent(order));
        return "ok";
    }

}

测试创建订单事件

发送请求:

curl -X POST \
  http://localhost:9090/event/spring/createOrder \
  -H 'cache-control: no-cache' \
  -H 'content-type: application/x-www-form-urlencoded' \
  -H 'postman-token: a632443a-b7b2-40df-6bc3-3881babd15e3' \
  -d 'id=1&orderName=53%E5%BA%A62018%E9%A3%9E%E5%A4%A9%E8%8C%85%E5%8F%B0&status=1&userName=%E5%BC%A0%E4%B8%89'

可以看到控制台打印:

2019-02-21 10:53:20.750  INFO 24168 --- [nio-9090-exec-3] c.gemantic.wealth.spring.EventPublisher  : 发布了一个[订单创建]事件:CreateOrderEvent(order=Order(id=1, orderName=53度2018飞天茅台, status=1, userName=张三))
2019-02-21 10:53:20.750  INFO 24168 --- [nio-9090-exec-3] c.g.wealth.spring.CreateOrderListener    : 注解式 spring event 收到消息,订单名称:53度2018飞天茅台 ; 订单状态为:1 ; 开始处理相应的事件。
2019-02-21 10:53:20.750  INFO 24168 --- [nio-9090-exec-3] c.g.wealth.spring.DuanXinNoticeListener  : 实现接口方式:收到[创建订单]事件。[短信通知]:亲爱的张三,您的订单[53度2018飞天茅台]已被创建
2019-02-21 10:53:20.750  INFO 24168 --- [nio-9090-exec-3] c.g.wealth.spring.WeiXinNoticeListener   : 实现接口方式:收到[创建订单]事件。[微信通知]:亲爱的张三,您的订单[53度2018飞天茅台]已被创建

事件已被执行,把status改为2:

2019-02-21 10:55:51.951  INFO 24168 --- [nio-9090-exec-5] c.gemantic.wealth.spring.EventPublisher  : 发布了一个[订单创建]事件:CreateOrderEvent(order=Order(id=1, orderName=53度2018飞天茅台, status=2, userName=张三))
2019-02-21 10:55:51.951  INFO 24168 --- [nio-9090-exec-5] c.g.wealth.spring.CreateOrderListener    : 注解式 spring event 收到消息,订单名称:53度2018飞天茅台 ; 订单状态为:2 ; 开始处理相应的事件。
2019-02-21 10:55:51.951  INFO 24168 --- [nio-9090-exec-5] c.g.wealth.spring.DuanXinNoticeListener  : 实现接口方式:收到[创建订单]事件。[短信通知]:亲爱的张三,您的订单[53度2018飞天茅台]已被创建
2019-02-21 10:55:51.951  INFO 24168 --- [nio-9090-exec-5] c.g.wealth.spring.WeiXinNoticeListener   : 实现接口方式:收到[创建订单]事件。[微信通知]:亲爱的张三,您的订单[53度2018飞天茅台]已被创建

可见可以很方便的根据条件执行监听。

补充1: 使用异步事件处理

可以在监听方法上使用注解@Async,前提是需要spring应用启动异步功能:@EnableAsync

补充2: TransactionalEventListener实现事务监控

我们知道,比如如上“发送短信”,“发送微信”等通知是需要在“订单入库”的基础上才能执行,也就是说“订单入库”完成,才能处理执行相应的监听器的逻辑。

使用@TransactionalEventListener即可完成如上控制,可参考: spring官方文档 Transaction-bound Events

注意一点,官方文档中有提出:

If no transaction is running, the listener is not invoked at all, since we cannot honor the required semantics. You can, however, override that behavior by setting the fallbackExecution attribute of the annotation to true.

也就是说,实现事务控制必须开启事务并且处于一个事务中,否者该监听器不会被调用。(不过可以设置fallbackExecution=true)

前提条件:

  • 开启事务管理:@EnableTransactionManagement
  • 配置事务管理器: PlatformTransactionManager(这一步@EnableTransactionManagement已默认实现,如须按需配置可手动配置)
  • 使用@Transactional处理一个事务

示例代码如下(前提已开启事务):

订单服务-“订单入库”:

@Slf4j
@Service
public class OrderService {

    @Autowired
    private EventPublisher createOrderEventPublisher;

    /**
     * 插入订单表操作
     */
    @Transactional(rollbackFor = Exception.class)
    public void insert(Order order) {
        log.info("[订单入库] start");

        createOrderEventPublisher.publishCreateOrderEvent(new CreateOrderEvent(order));
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        log.info("[订单入库] end");
    }

}

两个新的监听器,一个不是事务的,一个是事务的做对比:

@EventListener(condition = "#createOrderEvent.order.status == 3")
public void processCreateOrderEvent3(CreateOrderEvent createOrderEvent) {
    log.info("[EventListener] 注解式 spring event 收到消息,订单名称:{} ; 订单状态为:{} ; 开始处理相应的事件。", createOrderEvent.getOrder().getOrderName(), createOrderEvent.getOrder().getStatus());
}

@TransactionalEventListener(condition = "#createOrderEvent.order.status == 3", phase = TransactionPhase.AFTER_COMMIT)
public void processCreateOrderEvent4(CreateOrderEvent createOrderEvent) {
    log.info("[TransactionalEventListener] 注解式 spring event 收到消息,订单名称:{} ; 订单状态为:{} ; 开始处理相应的事件。", createOrderEvent.getOrder().getOrderName(), createOrderEvent.getOrder().getStatus());
}

控制器里的事务调用请求处理:

@PostMapping("/spring/createOrder_transaction")
public String createOrder_transaction(Order order) {
    orderService.insert(order);
    return "ok";
}

开启服务发请求测试:

curl -X POST \
  http://localhost:9090/event/spring/createOrder_transaction \
  -H 'cache-control: no-cache' \
  -H 'content-type: application/x-www-form-urlencoded' \
  -H 'postman-token: 1e9d2357-ba51-6aa6-156b-6fe2559cdb06' \
  -d 'id=1&orderName=53%E5%BA%A62018%E9%A3%9E%E5%A4%A9%E8%8C%85%E5%8F%B0&status=3&userName=%E5%BC%A0%E4%B8%89'

结果如下:

2019-02-21 14:07:31.467  INFO 24496 --- [nio-9090-exec-1] c.gemantic.wealth.service.OrderService   : [订单入库] start
2019-02-21 14:07:31.468  INFO 24496 --- [nio-9090-exec-1] c.gemantic.wealth.spring.EventPublisher  : 发布了一个[订单创建]事件:CreateOrderEvent(order=Order(id=1, orderName=53度2018飞天茅台, status=3, userName=张三))
2019-02-21 14:07:31.482  INFO 24496 --- [nio-9090-exec-1] c.g.wealth.spring.CreateOrderListener    : [EventListener] 注解式 spring event 收到消息,订单名称:53度2018飞天茅台 ; 订单状态为:3 ; 开始处理相应的事件。
2019-02-21 14:07:31.483  INFO 24496 --- [nio-9090-exec-1] c.g.wealth.spring.DuanXinNoticeListener  : 实现接口方式:收到[创建订单]事件。[短信通知]:亲爱的张三,您的订单[53度2018飞天茅台]已被创建
2019-02-21 14:07:31.483  INFO 24496 --- [nio-9090-exec-1] c.g.wealth.spring.WeiXinNoticeListener   : 实现接口方式:收到[创建订单]事件。[微信通知]:亲爱的张三,您的订单[53度2018飞天茅台]已被创建
2019-02-21 14:07:36.483  INFO 24496 --- [nio-9090-exec-1] c.gemantic.wealth.service.OrderService   : [订单入库] end
2019-02-21 14:07:36.484  INFO 24496 --- [nio-9090-exec-1] c.g.wealth.spring.CreateOrderListener    : [TransactionalEventListener] 注解式 spring event 收到消息,订单名称:53度2018飞天茅台 ; 订单状态为:3 ; 开始处理相应的事件。

请求大约5秒后返回,可以看到控制台,EventListener是直接处理的,TransactionalEventListener是在订单insert方法调用后再处理。这是因为TransactionalEventListener默认的处理是事务commit之后处理的,这里可以改注解的phase属性。参考枚举类:TransactionPhase

public enum TransactionPhase {
    BEFORE_COMMIT,
    AFTER_COMMIT,
    AFTER_ROLLBACK,
    AFTER_COMPLETION
}

补充3: 事件的父类监听

如果事件继承了某一父类,此父类也有监听,则每次发布事件,父类子类的监听器都会执行。

Guava EventBus

Google的进程内的发布订阅模式的实现,轻量级消息系统。 可参考:

那么场景依然是订单与通知场景,下面示例代码为集成在spring中的示例。

依赖guava包

maven引入:

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>22.0</version>
</dependency>

创建一个 Event

普通类即可以。

@Data
@AllArgsConstructor
public class CreateOrderEvent {
    private Order order;
}

创建一个 Publisher

发布者即 EventBus类,因为EventBus不是单例的,我们使用中又想用单例,创建一个事件管理器类。

/**
 * 事件管理器:统一管理所有事件
 */
public class EventBusManager {

    /**
     * 创建订单
     */
    public final static EventBus CREATE_ORDER = new EventBus();
}

监听事件

可以优雅的使用@Subscribe表示监听某一事件,这个注解没有其他属性。

除此之外,监听器需要手动注册到EventBus中类如:EventBus.register(listener)

方便起见,我们可以在spring容器初始化时注册监听器,并抽象之。

/**
 * 抽象的监听器
 */
public abstract class AbstractBusListener {

    /**
     * 统一在spring启动时注册事件
     */
    @PostConstruct
    public void init() {
        registerEventBus();
    }

    private void registerEventBus() {
        getEventBus().register(this);
    }

    protected abstract EventBus getEventBus();
}

@Slf4j
@Component
public class CreateOrderBusListener extends AbstractBusListener {

    /**
     * 订单创建事件:短信通知
     */
    @Subscribe
    public void onCreateOrderEvent2DuanxinNotice(CreateOrderEvent event) {
        log.info("收到[创建订单]事件。[短信通知]:亲爱的{},您的订单[{}]已被创建", event.getOrder().getUserName(), event.getOrder().getOrderName());
    }

    /**
     * 订单创建事件:微信通知
     */
    @Subscribe
    public void onCreateOrderEvent2WeixinNotice(CreateOrderEvent event) {
        log.info("收到[创建订单]事件。[微信通知]:亲爱的{},您的订单[{}]已被创建", event.getOrder().getUserName(), event.getOrder().getOrderName());
    }

    /**
     * 注册的为订单创建事件
     */
    @Override
    protected EventBus getEventBus() {
        return EventBusManager.CREATE_ORDER;
    }
}

发布事件

发布事件也是通过EventBus发布,通过其post(event)方法发布。

示例:

@PostMapping("/eventbus/createOrder")
public String createOrderByEventBus(Order order) {
    EventBusManager.CREATE_ORDER.post(new com.gemantic.wealth.eventbus.CreateOrderEvent(order));
    return "ok";
}

测试创建订单事件

发送请求:

curl -X POST \
  http://localhost:9090/event/eventbus/createOrder \
  -H 'cache-control: no-cache' \
  -H 'content-type: application/x-www-form-urlencoded' \
  -H 'postman-token: 6b5255d5-2b1d-e9c1-7ef3-a5678f7a73ad' \
  -d 'id=1&orderName=53%E5%BA%A62018%E9%A3%9E%E5%A4%A9%E8%8C%85%E5%8F%B0&status=1&userName=%E5%BC%A0%E4%B8%89'

看控制台打印:

2019-02-21 17:34:04.276  INFO 24556 --- [nio-9090-exec-1] c.g.w.eventbus.CreateOrderBusListener    : 收到[创建订单]事件。[微信通知]:亲爱的张三,您的订单[53度2018飞天茅台]已被创建
2019-02-21 17:34:04.276  INFO 24556 --- [nio-9090-exec-1] c.g.w.eventbus.CreateOrderBusListener    : 收到[创建订单]事件。[短信通知]:亲爱的张三,您的订单[53度2018飞天茅台]已被创建

补充1:使用异步事件处理

EventBus实现上改为AsyncEventBus即可,构造器内需要再配一个线程池。举例:

public final static EventBus CREATE_ORDER = new AsyncEventBus(Executors.newFixedThreadPool(5));

补充2:DeadEvent

介绍:

Wraps an event that was posted, but which had no subscribers and thus could not be delivered. Registering a DeadEvent subscriber is useful for debugging or logging, as it can detect misconfigurations in a system’s event distribution.

所有没被监听的事件都会被包装成一个DeadEvent。这种情况,只需要提供一个监听DeadEvent事件的监听器即可统一处理

补充3: 事件的父类监听

如果事件继承了某一父类,此父类也有监听,则每次发布事件,父类子类的监听器都会执行。

补充4: @AllowConcurrentEvents

描述:

Marks an event subscriber method as being thread-safe. This annotation indicates that EventBus may invoke the event subscriber simultaneously from multiple threads. This does not mark the method, and so should be used in combination with Subscribe.

创建监听器时会根据此注解判断创建普通监听者还是同步的监听者:

class Subscriber {

  /**
   * Creates a {@code Subscriber} for {@code method} on {@code listener}.
   */
  static Subscriber create(EventBus bus, Object listener, Method method) {
    return isDeclaredThreadSafe(method)
        ? new Subscriber(bus, listener, method)
        : new SynchronizedSubscriber(bus, listener, method);
  }

  private static boolean isDeclaredThreadSafe(Method method) {
    return method.getAnnotation(AllowConcurrentEvents.class) != null;
  }

  ...
}


同步的监听者则会加锁同步调用订阅方法:

  @VisibleForTesting
  static final class SynchronizedSubscriber extends Subscriber {

    private SynchronizedSubscriber(EventBus bus, Object target, Method method) {
      super(bus, target, method);
    }

    @Override
    void invokeSubscriberMethod(Object event) throws InvocationTargetException {
      synchronized (this) {
        super.invokeSubscriberMethod(event);
      }
    }
  }

加锁会有一定的开销,所以如果是同步的EventBus监听者或者是你确认线程安全的AsyncEventBus监听者,最好标注注解@AllowConcurrentEvents

事件处理异常

Spring EventGuava EventBus
同步会对其他监听器有影响不影响其他监听器的处理
异步不影响其他监听器的处理不影响其他监听器的处理

针对同步的情况单独说下。

EventBus

事件处理逻辑,无论同步异步都是用线程执行,区别在于同步的情况使用的是同一个线程执行。

final void dispatchEvent(final Object event) {
        this.executor.execute(new Runnable() {
            public void run() {
                try {
                    Subscriber.this.invokeSubscriberMethod(event);
                } catch (InvocationTargetException var2) {
                    Subscriber.this.bus.handleSubscriberException(var2.getCause(), Subscriber.this.context(event));
                }

            }
        });
    }

this.executor是:

public static Executor directExecutor() {
    return MoreExecutors.DirectExecutor.INSTANCE;
}

可以看到EventBus把异常都统一catch了,异常不上抛,别的监听器依然正常处理。

Spring

Spring调用的时候的异常都是往外抛的:

protected Object doInvoke(Object... args) {
    Object bean = getTargetBean();
    ReflectionUtils.makeAccessible(this.bridgedMethod);
    try {
        return this.bridgedMethod.invoke(bean, args);
    }
    catch (IllegalArgumentException ex) {
        assertTargetBean(this.bridgedMethod, bean, args);
        throw new IllegalStateException(getInvocationErrorMessage(bean, ex.getMessage(), args), ex);
    }
    catch (IllegalAccessException ex) {
        throw new IllegalStateException(getInvocationErrorMessage(bean, ex.getMessage(), args), ex);
    }
    catch (InvocationTargetException ex) {
        // Throw underlying exception
        Throwable targetException = ex.getTargetException();
        if (targetException instanceof RuntimeException) {
            throw (RuntimeException) targetException;
        }
        else {
            String msg = getInvocationErrorMessage(bean, "Failed to invoke event listener method", args);
            throw new UndeclaredThrowableException(targetException, msg);
        }
    }
}

分别适合的场景

异步情况下,都是多线程处理,无差异。

同步情况下,同一线程处理,由于Spring会抛出异常,适合事件有依赖的情况,而EventBus适合事件互不依赖的场景。

举个栗子:

  • 下订单场景一。订单已经创建,那么会发送短信通知,微信通知,此二者之间无依赖,适合EventBus.
  • 下订单场景二。如果创建订单也作为事件,那么微信短信通知需依赖订单事件已完成,如果创建订单异常失败则不需要发送通知,适合Spring Event

异步实现的区别

EventBus的异步实现

EventBus的异步实现是依赖于线程池的,在创建的时候确定,实际上是在监听器调用方法的时候采用的异步,同一个EventBus都是同一种处理方式(同步or异步)。

Spring的异步实现

Spring是通过拦截器的机制,被@Async的方法都会被AsyncExecutionInterceptor拦截,然后采用多线程的方式调用:

public Object invoke(final MethodInvocation invocation) throws Throwable {
    Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
    Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
    final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
    AsyncTaskExecutor executor = this.determineAsyncExecutor(userDeclaredMethod);
    if (executor == null) {
        throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");
    } else {
        Callable<Object> task = new Callable<Object>() {
            public Object call() throws Exception {
                try {
                    Object result = invocation.proceed();
                    if (result instanceof Future) {
                        return ((Future)result).get();
                    }
                } catch (ExecutionException var2) {
                    AsyncExecutionInterceptor.this.handleError(var2.getCause(), userDeclaredMethod, invocation.getArguments());
                } catch (Throwable var3) {
                    AsyncExecutionInterceptor.this.handleError(var3, userDeclaredMethod, invocation.getArguments());
                }

                return null;
            }
        };
        // 异步调用
        return this.doSubmit(task, executor, invocation.getMethod().getReturnType());
    }
}

异步方式小结

综上,还是Spring Event更加灵活多变,适用场景更多。

Spring Event与 Guava EventBus的比较

Spring EventGuava EventBus
Event任意对象任意对象
PublisherEventBusApplicationEventPublisher
Subscriber@EventListener@Subscribe
发布方法ApplicationEventPublisher#publishEventEventBus#post
注册方法spring自动注册手动注册:EventBus#register
是否支持异步支持。@Async支持。AsyncEventBus
是否支持事务支持。@TransactionalEventListener不支持。
是否支持条件过滤支持。不支持。
是否支持DeadEvent不支持。支持。
是否支持事件继承支持。支持。
同步时是否支持排序支持。不支持。
事件异常处理异常上抛捕获异常并不上抛
复杂度复杂轻量

总结

Spring EventGuava EventBus在设计上略有不同,可根据业务场景按需使用, 总的来说Guava EventBus比较轻量一些,适合大多数业务场景,而Spring Event 稍微重了,但也开箱可用,与Spring天然一体,多了很多花哨的功能也适合更多的复杂场景。

-------------本文结束,感谢您的阅读-------------
贵在坚持,如果您觉得本文还不错,不妨打赏一下~
0%