java EventBus 使用总结

2025-06-09 10:01:13 cctv5世界杯 808

目录

介绍使用方式总结扩展bus的结构**EventBus:****AsyncEventBus:**

注册订阅者发布事件彩蛋封装

介绍

EventBus是一个典型的事件分发器,Observer模式。订阅者通过register集中到EventBus类中,当发布者通过post MessageEvent时,通知到订阅者。适用于一对多,当一个消息需要被多次不同处理时使用。

使用方式

简单实用直接上代码

/**

* 创建一个自定义的事件

*/

public class EventOne {

private final String url;

public EventOne(String url) {

this.url = url;

}

public String getUrl(){

return this.url;

}

}

//创建两个订阅者,用来接收我们发送的自定义事件

/**

* 订阅者1

*/

public class ListenerOne {

@Subscribe

public void consume(EventOne event){

System.out.println("ListenerOne"+event.getUrl());

}

}

/**

* 订阅者2

*/

public class ListenerTwo {

@Subscribe

public void consume(EventOne event){

System.out.println("ListenerTwo"+event.getUrl());

}

}

//使用eventBus来把订阅者注册到EventBus类里面去,然后发送事件

public class GoogleEventBus {

public static void main(String[] args) {

EventBus eventBus = new EventBus(new SubscriberExceptionHandler(){

//这里是对订阅者抛出的异常做的自定义处理

@Override

public void handleException(Throwable exception, SubscriberExceptionContext context) {

System.out.println("处理异常,异常信息:"+ exception.getMessage());

}

});

eventBus.register(new ListenerOne());

eventBus.register(new ListenerTwo());

eventBus.post(new EventOne("fjlajflkaj"));

}

}

//结果

ListenerOnefjlajflkaj

ListenerTwofjlajflkaj

总结扩展

使用 EventBus 进行注册订阅者,发送事件,可以实现所有订阅者都收到了发送的事件,就是一个 发布/订阅 模式。

EventBus 还有个实现类:AsyncEventBus 两者的区别在于,前者是同步调用订阅者的消费方法(会阻塞主线程),后者则是异步调用(不会阻塞主线程),下面我们通过源码分析一下它的实现原理

bus的结构

我们先看一下 EventBus和AsyncEventBus

EventBus:

(源码中代码多了显得太乱,这里只展示重要的部分) executor:用来调用订阅者消费方法的线程池 exceptionHandler:自定义的异常处理 subscribers:用来存放订阅者信息 dispatcher:一个固定的对象(PerThreadQueuedDispatcher),EventBus提供的

我们new EventBus() 的时候 会给属性赋值

executor = MoreExecutors.directExecutor() 这里请记住他的execute方法

subscribers = new SubscriberRegistry(this); 就是创建了一个SubscriberRegistry对象,把自己放进了他的属性bus里了

dispatcher = Dispatcher.perThreadDispatchQueue() 创建了一个PerThreadQueuedDispatcher的对象

AsyncEventBus:

它继承了EventBus,和EventBus的区别有两点

1:executor是自定义的,new 的时候需要传进去 2:dispatcher不同,EventBus使用PerThreadQueuedDispatcher,AsyncEventBus使用LegacyAsyncDispatcher

看懂了bus的结构,那我们来看一下他是怎么把订阅者register到属性subscribers里去的

注册订阅者

我们先看第一步

这一步的意思是找到要注册的这个类里面所有带有@Subscribe注解的方法,然后获取方法的第一个参数,把方法的一些信息放进一个对象里(Subscriber)

比如这个类,他有两个@Subscribe注解的方法,被解析之后就变成了这样 methodsInListener = {EventOne.class:[Subscriber1,Subscriber2]} 事件对应方法的信息

那第一步之后的逻辑就好理解了 拿到解析好的数据之后,循环出来放进subscribers里,相同的事件类型放到同一个list里 最终存放在 subscribers 里的数据就是这个样子:

{

"Event1.class":[

"Subscriber1",

"Subscriber2",

"Subscriber3"

],

"Event2.class":[

"Subscriber1",

"Subscriber2",

"Subscriber3"

]

}

Event.class:事件类对象 Subscriber:订阅者消费方法信息

发布事件

post方法非常简单,先从subscribers里根据事件类型拿到所有的订阅者消费方法的信息,然后交给 dispatcher 的 dispatch 方法执行,那最主要的就是 dispatcher 的 dispatch 方法了。

这里是否还记得 EventBus 和 AsyncEventBus 的 dispatcher 是两个不同的对象!(不记得的话往上扒拉扒拉)

我们先看 EventBus 里的 PerThreadQueuedDispatcher 的 dispatch()

这里先是判断一下非空,然后放到一个ThreadLocal的队列里,最终从队列里取出来调了 Subscriber 的 dispatchEvent 方法(至于为什么中间加了两个ThreadLocal的属性,我目前还没看出来是啥原因,可能是怕多线程之间出现问题?)

dispatchEvent 方法就是交给线程池去执行,反射调用目标方法

AsyncEventBus 里的 LegacyAsyncDispatcher 的 dispatch()

这就好理解了,先放队列,然后从队列取,同上一样交给线程池去处理,然后method.invoke();

这里会不会有老表问一个问题?不管是EventBus还是AsyncEventBus,到最后都是交给线程池处理了,这不都是异步的吗?

如果有老表有这个疑问,可以再扒拉扒拉他们两个executor的execute方法有啥不一样!

彩蛋

最后给学习的老表提一个问题哈,AsyncEventBus异步是说多个不同事件之间的执行方式是异步的,那多个相同事件之间是不是异步的呢,EventBus提供了一个注解 @AllowConcurrentEvents 是解决多个相同事件之间异步的,至于他是怎么实现的,老表们可以自己带着问题去扒拉扒拉源码瞅瞅

封装

在学会了EventBus之后,老表们会不会有个想法呢? 你看我们每次都得往bus里注册订阅者,有多少个就得注册多少个,那我们能不能写个自动注册的呢,然后我们之后用的时候就能直接post就行了

直接上代码吧

public interface EventPark {

void post(Object eventObject);

void register(MyEventListener listener);

void unregister(MyEventListener listener);

}

------------------------------------------------------------------------------------------------------------------------------------------------------------

public abstract class AbstractEventPark implements EventPark, ApplicationListener {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractEventPark.class);

protected abstract EventBus getEventBus();

protected abstract String getEventParkName();

@Override

public void register(MyEventListener listener) {

if (LOGGER.isInfoEnabled()) {

LOGGER.info("Registering listeners to {}: {}", getEventParkName(), listener);

}

boolean noSubscribeMethod = true;

Class clazz = AopUtils.getTargetClass(listener);

Method[] methods = clazz.getMethods();

for (Method method : methods) {

if (method.isAnnotationPresent(Subscribe.class)) {

noSubscribeMethod = false;

}

}

if (noSubscribeMethod) {

throw new RuntimeException(clazz + " must have an @Subscribe method!");

}

EventBus eventBus = getEventBus();

eventBus.register(listener);

}

@Override

public void unregister(MyEventListener listener) {

getEventBus().unregister(listener);

}

@Override

public void post(Object eventObject) {

getEventBus().post(eventObject);

if (LOGGER.isDebugEnabled()) {

LOGGER.debug("EventBus post event: {}", eventObject);

}

}

@Override

public void onApplicationEvent(ApplicationReadyEvent event) {

ConfigurableApplicationContext applicationContext = event.getApplicationContext();

Map beansOfType = applicationContext.getBeansOfType(MyEventListener.class);

Set> entries = beansOfType.entrySet();

for (Map.Entry entry : entries) {

this.register(entry.getValue());

}

}

}

------------------------------------------------------------------------------------------------------------------------------------------------------------

/**

*

异步的事件注册、提交中心

*

*

异步队列,不会阻塞发送 {@link #post(Object)} 线程,生产的消息会先 {@code add} 到 {@code Dispatcher#ConcurrentLinkedQueue} 队列里,然后 {@code pool} 队列,使用线程池处理。

*

消费者执行时会占用线程池队列,声明 @AllowConcurrentEvents 表示这个监听器支持多线程处理,否则同一时间只会处理一条消息。

*

*

如果没有监听者将发送死信事件 DeadEvent

* @author jingyu.li

* @date 2023-05-24

*/

@Component

public final class AsyncEventPark extends AbstractEventPark {

private static final Logger LOGGER = LoggerFactory.getLogger(AsyncEventPark.class);

private final AsyncEventBus asyncEventBus;

private final ThreadPoolExecutor threadPoolExecutor;

// public AsyncEventPark() {

// this(Runtime.getRuntime().availableProcessors() * 2 + 1);

// }

public AsyncEventPark() {

this(2);

}

public AsyncEventPark(int coreSize) {

this(new ThreadPoolExecutor(coreSize, coreSize, 0, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<>(), new NamedThreadFactory("event-park")));

}

public AsyncEventPark(ThreadPoolExecutor executor) {

this.threadPoolExecutor = executor;

this.asyncEventBus = new AsyncEventBus("async-event-bus", executor);

if (LOGGER.isInfoEnabled()) {

LOGGER.info("ThreadPoolExecutor current core pool size is {}", executor.getCorePoolSize());

}

}

@Override

protected EventBus getEventBus() {

return asyncEventBus;

}

@Override

protected String getEventParkName() {

return "AsyncEventPark";

}

public int getCorePoolSize() {

return this.threadPoolExecutor.getCorePoolSize();

}

public int getActiveCount() {

return this.threadPoolExecutor.getActiveCount();

}

public int getQueueSize() {

return this.threadPoolExecutor.getQueue().size();

}

}

------------------------------------------------------------------------------------------------------------------------------------------------------------

public interface MyEventListener {

/**

* 消费消息事件

* @param event 事件对象

*/

void consume(T event);

}

订阅者只需要实现MyEventListener接口,并实现consume方法,再带上@Subscribe注解,就会被自动注册到bus里去

我们用的时候就在类中注入EventPark,调用EventPark.post()方法,就ok了

有兴趣的老表可以试一试,我这就不再解释了

对EventBus的介绍纯个人理解,有不对的地方欢迎评论区的老表讨论。

梦想世界儒法全攻略(梦想世界儒术)
iPhone 11 Pro 快充测试:30 分钟可充电 50% 以上