One 、 Preface
Publish subscribe mode is also called observer mode , This mode can be used for code decoupling , Many frameworks use this design pattern , such as Spring Event mechanism ,guava Of EventBus( Event bus ) etc. , If you don't know the observer mode, you can check your previous blog 《 Observer mode of design mode 》.
In order to better understand the implementation principle of the existing event framework , He wrote a simple event release / Subscription framework for your reference .
Two 、 Design code
First, create an event class to inherit , All events inherit this class .
/**
* @author 2YSP
* @date 2022/4/16 16:00
*/
public class Event extends EventObject {
/**
* Constructs a prototypical Event.
*
* @param source The object on which the Event initially occurred.
* @throws IllegalArgumentException if source is null.
*/
public Event(Object source) {
super(source);
}
}
JDK Require all events to inherit EventObject, And pass source Get the event source .
Then define the event listener interface EventListener
/**
* @author 2YSP
* @description: Event listener
* @date 2022/4/10 14:45
*/
public interface EventListener<E extends Event> {
/**
* Triggering event
* @param e
*/
void onEvent(E e);
}
The core part is that you need a class to manage all event listeners , Have the following three methods :
registerListener(): Register an event listener
removeListener(): Remove event listener
notifyListener(): Notify all listeners triggered by this event
package cn.sp.event;
import com.google.common.collect.Lists;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;
import javax.annotation.PostConstruct;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author 2YSP
* @date 2022/4/16 16:12
*/
@Component
public class EventManager implements ApplicationContextAware {
/**
* event map
*/
private static Map<Class<? extends Event>, List<EventListener>> map = new HashMap<>(64);
private static ApplicationContext applicationContext;
private static final String EVENT_METHOD = "onEvent";
/**
* Initialize the event cache map
*/
@PostConstruct
private void initEventMap() {
Map<String, EventListener> beanMap = applicationContext.getBeansOfType(EventListener.class);
if (beanMap == null) {
return;
}
beanMap.forEach((key, value) -> {
// Reflection acquisition onEvent The parameter type of the method
Method[] methods = ReflectionUtils.getDeclaredMethods(value.getClass());
for (Method method : methods) {
if (method.getName().equals(EVENT_METHOD)) {
Parameter parameter = method.getParameters()[0];
// Parameter must be Event Subclasses of
if (parameter.getType().getName().equals(Event.class.getName())) {
continue;
}
registerListener((Class<? extends Event>) parameter.getType(), value);
}
}
});
}
/**
* Register an event listener
*
* @param clazz
* @param eventListener
* @param <E>
*/
public <E extends Event> void registerListener(Class<? extends Event> clazz, EventListener<E> eventListener) {
List<EventListener> list = map.get(clazz);
if (CollectionUtils.isEmpty(list)) {
map.put(clazz, Lists.newArrayList(eventListener));
} else {
list.add(eventListener);
map.put(clazz, list);
}
}
/**
* Remove an event listener
*
* @param clazz
* @param <E>
*/
public <E extends Event> void removeListener(Class<E> clazz) {
map.remove(clazz);
}
/**
* Notify all listeners of this event
*
* @param <E>
*/
public <E extends Event> void notifyListener(E e) {
List<EventListener> eventListeners = map.get(e.getClass());
if (CollectionUtils.isEmpty(eventListeners)) {
return;
}
eventListeners.forEach(eventListener -> {
// Synchronous execution
eventListener.onEvent(e);
});
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
EventManager.applicationContext = applicationContext;
}
}
adopt initEventMap() Method after the project is started , Register all event listeners with reflection , however notifyListener() The method is serial execution , If you want to execute asynchronously, add a tag annotation @AsyncExecute That's it , The optimized version is as follows :
/**
* @author 2YSP
* @date 2022/4/16 17:35
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface AsyncExecute {
}
EventManager
/**
* @author 2YSP
* @date 2022/4/16 16:12
*/
@Component
public class EventManager implements ApplicationContextAware {
/**
* event map
*/
private static Map<Class<? extends Event>, List<EventListener>> map = new HashMap<>(64);
private static ApplicationContext applicationContext;
private static final String EVENT_METHOD = "onEvent";
/**
* Event execution thread pool
*/
private static ExecutorService eventPool = new ThreadPoolExecutor(4,
8, 30L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(512), new ThreadFactoryBuilder().setNameFormat("event-pool-%d").build());
/**
* Initialize the event cache map
*/
@PostConstruct
private void initEventMap() {
Map<String, EventListener> beanMap = applicationContext.getBeansOfType(EventListener.class);
if (beanMap == null) {
return;
}
beanMap.forEach((key, value) -> {
// Reflection acquisition onEvent The parameter type of the method
Method[] methods = ReflectionUtils.getDeclaredMethods(value.getClass());
for (Method method : methods) {
if (method.getName().equals(EVENT_METHOD)) {
Parameter parameter = method.getParameters()[0];
// Parameter must be Event Subclasses of
if (parameter.getType().getName().equals(Event.class.getName())) {
continue;
}
registerListener((Class<? extends Event>) parameter.getType(), value);
}
}
});
}
/**
* Register an event listener
*
* @param clazz
* @param eventListener
* @param <E>
*/
public <E extends Event> void registerListener(Class<? extends Event> clazz, EventListener<E> eventListener) {
List<EventListener> list = map.get(clazz);
if (CollectionUtils.isEmpty(list)) {
map.put(clazz, Lists.newArrayList(eventListener));
} else {
list.add(eventListener);
map.put(clazz, list);
}
}
/**
* Remove an event listener
*
* @param clazz
* @param <E>
*/
public <E extends Event> void removeListener(Class<E> clazz) {
map.remove(clazz);
}
/**
* Notify all listeners of this event
*
* @param <E>
*/
public <E extends Event> void notifyListener(E e) {
List<EventListener> eventListeners = map.get(e.getClass());
if (CollectionUtils.isEmpty(eventListeners)) {
return;
}
eventListeners.forEach(eventListener -> {
AsyncExecute asyncExecute = eventListener.getClass().getAnnotation(AsyncExecute.class);
if (asyncExecute == null) {
// Synchronous execution
eventListener.onEvent(e);
} else {
// Asynchronous execution
eventPool.execute(() -> eventListener.onEvent(e));
}
});
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
EventManager.applicationContext = applicationContext;
}
}
@AsyncExecute Annotations can be used on classes , Every time you call notifyListener() Method to determine the existence of by reflection @AsyncExecute annotation , If it exists, execute asynchronously with thread pool , In fact, the performance of reflection is not very good , If you pursue performance, you can consider maintaining the information of whether to execute asynchronously into the event cache during initialization map in .
Now there is a tool class for publishing events EventPublisher
/**
* @author 2YSP
* @date 2022/4/16 16:07
*/
@Component
public class EventPublisher<E extends Event> {
@Resource
private EventManager eventManager;
public <E extends Event> void publish(E event) {
eventManager.notifyListener(event);
}
}
3、 ... and 、 test
The test scenario is after the order is created , An order creation event occurs , Then two listeners listened to the event , The difference is that one is used @AsyncExecute annotation , A no .
- establish Order Entity
public class Order {
private String orderNo;
public String getOrderNo() {
return orderNo;
}
public void setOrderNo(String orderNo) {
this.orderNo = orderNo;
}
}
2. Create order creation event
public class OrderCreateEvent extends Event {
private Order order;
public OrderCreateEvent(Object source, Order order) {
super(source);
this.order = order;
}
public Order getOrder() {
return order;
}
public void setOrder(Order order) {
this.order = order;
}
}
3. Create an event listener
@Component
public class OrderCreateEventListener implements EventListener<OrderCreateEvent> {
@Override
public void onEvent(OrderCreateEvent orderCreateEvent) {
System.out.println(Thread.currentThread().getName() + "-- Listen for order creation events .........");
Order order = orderCreateEvent.getOrder();
System.out.println(order.getOrderNo());
}
}
@AsyncExecute
@Component
public class OrderCreateEventListener2 implements EventListener<OrderCreateEvent> {
@Override
public void onEvent(OrderCreateEvent orderCreateEvent) {
System.out.println(Thread.currentThread().getName() + "-- Listen for order creation events 2.........");
Order order = orderCreateEvent.getOrder();
System.out.println(order.getOrderNo());
}
}
4. Release events
@RequestMapping("/order")
@RestController
public class OrderController {
@Resource
private OrderService orderService;
@PostMapping("")
public void create(@RequestBody Order order) {
orderService.create(order);
}
}
@Service
public class OrderService {
@Resource
private EventPublisher<OrderCreateEvent> publisher;
/**
* Create order
*
* @param order
*/
public void create(Order order) {
// Send order creation event
order.setOrderNo("sssss");
publisher.publish(new OrderCreateEvent(this, order));
}
}
The test code is written , Start the project request order creation interface http://localhost:8080/order, The console output is as follows
http-nio-8080-exec-2-- Listen for order creation events .........
sssss
event-pool-0-- Listen for order creation events 2.........
sssss
It means that both event listeners are triggered , And the thread name is different , One is that the main thread executes synchronously , The other is thread pool asynchrony , So far, the test is successful .
Four 、 summary
After writing, discover and implement a release / The subscription framework is not difficult , Of course, this function is relatively simple , There is room for optimization , The code has been uploaded to github, Click to view .