当前位置:网站首页>Handwritten event publish subscribe framework

Handwritten event publish subscribe framework

2022-04-23 16:57:00 Smoke I

One 、 Preface

​ Publish subscribe mode is also called observer mode , This mode can be used for code decoupling , Many frameworks use this design pattern , such as Spring Event mechanism ,guava Of EventBus( Event bus ) etc. , If you don't know the observer mode, you can check your previous blog 《 Observer mode of design mode 》.

​ In order to better understand the implementation principle of the existing event framework , He wrote a simple event release / Subscription framework for your reference .

Two 、 Design code

First, create an event class to inherit , All events inherit this class .

/**
 * @author 2YSP
 * @date 2022/4/16 16:00
 */
public class Event extends EventObject {


    /**
     * Constructs a prototypical Event.
     *
     * @param source The object on which the Event initially occurred.
     * @throws IllegalArgumentException if source is null.
     */
    public Event(Object source) {
        super(source);
    }
}

JDK Require all events to inherit EventObject, And pass source Get the event source .

Then define the event listener interface EventListener

/**
 * @author 2YSP
 * @description:  Event listener 
 * @date 2022/4/10 14:45
 */
public interface EventListener<E extends Event> {

    /**
     *  Triggering event 
     * @param e
     */
    void onEvent(E e);

}

The core part is that you need a class to manage all event listeners , Have the following three methods :

registerListener(): Register an event listener

removeListener(): Remove event listener

notifyListener(): Notify all listeners triggered by this event

package cn.sp.event;

import com.google.common.collect.Lists;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;

import javax.annotation.PostConstruct;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author 2YSP
 * @date 2022/4/16 16:12
 */
@Component
public class EventManager implements ApplicationContextAware {
    /**
     *  event map
     */
    private static Map<Class<? extends Event>, List<EventListener>> map = new HashMap<>(64);


    private static ApplicationContext applicationContext;

    private static final String EVENT_METHOD = "onEvent";

    /**
     *  Initialize the event cache map
     */
    @PostConstruct
    private void initEventMap() {
        Map<String, EventListener> beanMap = applicationContext.getBeansOfType(EventListener.class);
        if (beanMap == null) {
            return;
        }
        beanMap.forEach((key, value) -> {
            //  Reflection acquisition onEvent The parameter type of the method 
            Method[] methods = ReflectionUtils.getDeclaredMethods(value.getClass());
            for (Method method : methods) {
                if (method.getName().equals(EVENT_METHOD)) {
                    Parameter parameter = method.getParameters()[0];
                    //  Parameter must be Event Subclasses of 
                    if (parameter.getType().getName().equals(Event.class.getName())) {
                        continue;
                    }
                    registerListener((Class<? extends Event>) parameter.getType(), value);
                }
            }
        });
    }

    /**
     *  Register an event listener 
     *
     * @param clazz
     * @param eventListener
     * @param <E>
     */
    public <E extends Event> void registerListener(Class<? extends Event> clazz, EventListener<E> eventListener) {
        List<EventListener> list = map.get(clazz);
        if (CollectionUtils.isEmpty(list)) {
            map.put(clazz, Lists.newArrayList(eventListener));
        } else {
            list.add(eventListener);
            map.put(clazz, list);
        }
    }

    /**
     *  Remove an event listener 
     *
     * @param clazz
     * @param <E>
     */
    public <E extends Event> void removeListener(Class<E> clazz) {
        map.remove(clazz);
    }

    /**
     *  Notify all listeners of this event 
     *
     * @param <E>
     */
    public <E extends Event> void notifyListener(E e) {
        List<EventListener> eventListeners = map.get(e.getClass());
        if (CollectionUtils.isEmpty(eventListeners)) {
            return;
        }
        eventListeners.forEach(eventListener -> {
                //  Synchronous execution 
                eventListener.onEvent(e);
        });
    }


    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        EventManager.applicationContext = applicationContext;
    }
}

adopt initEventMap() Method after the project is started , Register all event listeners with reflection , however notifyListener() The method is serial execution , If you want to execute asynchronously, add a tag annotation @AsyncExecute That's it , The optimized version is as follows :

/**
 * @author 2YSP
 * @date 2022/4/16 17:35
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface AsyncExecute {

}

EventManager

/**
 * @author 2YSP
 * @date 2022/4/16 16:12
 */
@Component
public class EventManager implements ApplicationContextAware {
    /**
     *  event map
     */
    private static Map<Class<? extends Event>, List<EventListener>> map = new HashMap<>(64);


    private static ApplicationContext applicationContext;

    private static final String EVENT_METHOD = "onEvent";

    /**
     *  Event execution thread pool 
     */
    private static ExecutorService eventPool = new ThreadPoolExecutor(4,
            8, 30L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(512), new ThreadFactoryBuilder().setNameFormat("event-pool-%d").build());

    /**
     *  Initialize the event cache map
     */
    @PostConstruct
    private void initEventMap() {
        Map<String, EventListener> beanMap = applicationContext.getBeansOfType(EventListener.class);
        if (beanMap == null) {
            return;
        }
        beanMap.forEach((key, value) -> {
            //  Reflection acquisition onEvent The parameter type of the method 
            Method[] methods = ReflectionUtils.getDeclaredMethods(value.getClass());
            for (Method method : methods) {
                if (method.getName().equals(EVENT_METHOD)) {
                    Parameter parameter = method.getParameters()[0];
                    //  Parameter must be Event Subclasses of 
                    if (parameter.getType().getName().equals(Event.class.getName())) {
                        continue;
                    }
                    registerListener((Class<? extends Event>) parameter.getType(), value);
                }
            }
        });
    }

    /**
     *  Register an event listener 
     *
     * @param clazz
     * @param eventListener
     * @param <E>
     */
    public <E extends Event> void registerListener(Class<? extends Event> clazz, EventListener<E> eventListener) {
        List<EventListener> list = map.get(clazz);
        if (CollectionUtils.isEmpty(list)) {
            map.put(clazz, Lists.newArrayList(eventListener));
        } else {
            list.add(eventListener);
            map.put(clazz, list);
        }
    }

    /**
     *  Remove an event listener 
     *
     * @param clazz
     * @param <E>
     */
    public <E extends Event> void removeListener(Class<E> clazz) {
        map.remove(clazz);
    }

    /**
     *  Notify all listeners of this event 
     *
     * @param <E>
     */
    public <E extends Event> void notifyListener(E e) {
        List<EventListener> eventListeners = map.get(e.getClass());
        if (CollectionUtils.isEmpty(eventListeners)) {
            return;
        }
        eventListeners.forEach(eventListener -> {
            AsyncExecute asyncExecute = eventListener.getClass().getAnnotation(AsyncExecute.class);
            if (asyncExecute == null) {
                //  Synchronous execution 
                eventListener.onEvent(e);
            } else {
                //  Asynchronous execution 
                eventPool.execute(() -> eventListener.onEvent(e));
            }
        });
    }


    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        EventManager.applicationContext = applicationContext;
    }
}

@AsyncExecute Annotations can be used on classes , Every time you call notifyListener() Method to determine the existence of by reflection @AsyncExecute annotation , If it exists, execute asynchronously with thread pool , In fact, the performance of reflection is not very good , If you pursue performance, you can consider maintaining the information of whether to execute asynchronously into the event cache during initialization map in .

Now there is a tool class for publishing events EventPublisher

/**
 * @author 2YSP
 * @date 2022/4/16 16:07
 */
@Component
public class EventPublisher<E extends Event> {

    @Resource
    private EventManager eventManager;

    public <E extends Event> void publish(E event) {
        eventManager.notifyListener(event);
    }
}

3、 ... and 、 test

The test scenario is after the order is created , An order creation event occurs , Then two listeners listened to the event , The difference is that one is used @AsyncExecute annotation , A no .

  1. establish Order Entity
public class Order {

    private String orderNo;

    public String getOrderNo() {
        return orderNo;
    }

    public void setOrderNo(String orderNo) {
        this.orderNo = orderNo;
    }
}

2. Create order creation event

public class OrderCreateEvent extends Event {

    private Order order;

    public OrderCreateEvent(Object source, Order order) {
        super(source);
        this.order = order;
    }

    public Order getOrder() {
        return order;
    }

    public void setOrder(Order order) {
        this.order = order;
    }
}

3. Create an event listener

@Component
public class OrderCreateEventListener implements EventListener<OrderCreateEvent> {

    @Override
    public void onEvent(OrderCreateEvent orderCreateEvent) {
        System.out.println(Thread.currentThread().getName() + "-- Listen for order creation events .........");
        Order order = orderCreateEvent.getOrder();
        System.out.println(order.getOrderNo());
    }
}


@AsyncExecute
@Component
public class OrderCreateEventListener2 implements EventListener<OrderCreateEvent> {

    @Override
    public void onEvent(OrderCreateEvent orderCreateEvent) {
        System.out.println(Thread.currentThread().getName() + "-- Listen for order creation events 2.........");
        Order order = orderCreateEvent.getOrder();
        System.out.println(order.getOrderNo());
    }
}

4. Release events

@RequestMapping("/order")
@RestController
public class OrderController {

    @Resource
    private OrderService orderService;

    @PostMapping("")
    public void create(@RequestBody Order order) {
        orderService.create(order);
    }
}


@Service
public class OrderService {

    @Resource
    private EventPublisher<OrderCreateEvent> publisher;


    /**
     *  Create order 
     *
     * @param order
     */
    public void create(Order order) {
        //  Send order creation event 
        order.setOrderNo("sssss");
        publisher.publish(new OrderCreateEvent(this, order));
    }
}

The test code is written , Start the project request order creation interface http://localhost:8080/order, The console output is as follows

http-nio-8080-exec-2-- Listen for order creation events .........
sssss
event-pool-0-- Listen for order creation events 2.........
sssss

It means that both event listeners are triggered , And the thread name is different , One is that the main thread executes synchronously , The other is thread pool asynchrony , So far, the test is successful .

Four 、 summary

After writing, discover and implement a release / The subscription framework is not difficult , Of course, this function is relatively simple , There is room for optimization , The code has been uploaded to github, Click to view .

版权声明
本文为[Smoke I]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204231653591274.html