upgrade event to eventbus; remove old event package

This commit is contained in:
aynakeya
2025-09-29 00:02:49 +08:00
parent 82662c308a
commit f13577f890
50 changed files with 356 additions and 1139 deletions

View File

@@ -1,165 +0,0 @@
package event
import (
"fmt"
"sync"
)
type EventId string
type Event struct {
Id EventId
Data interface{}
lock sync.Mutex // just placed for now, i don't know why i place it here, seems useless
}
type HandlerFunc func(event *Event)
type Handler struct {
EventId EventId
Name string
Handler HandlerFunc
}
type Manager struct {
handlers map[EventId]map[string]*Handler
eventWorkerIdMap map[EventId]int
pendingEvents []*Event
workerQueue []chan func()
currentWorkerId int
stopSig chan int
queueSize int
workerSize int
dispatching bool
lock sync.RWMutex
}
func NewManger(queueSize int, workerSize int) *Manager {
manager := &Manager{
handlers: make(map[EventId]map[string]*Handler),
eventWorkerIdMap: make(map[EventId]int),
workerQueue: make([]chan func(), workerSize),
currentWorkerId: 0,
stopSig: make(chan int, workerSize),
queueSize: queueSize,
workerSize: workerSize,
lock: sync.RWMutex{},
dispatching: false,
pendingEvents: make([]*Event, 0),
}
for i := 0; i < workerSize; i++ {
queue := make(chan func(), queueSize)
manager.workerQueue[i] = queue
go func() {
for {
select {
case <-manager.stopSig:
return
case f := <-queue:
f()
}
}
}()
}
return manager
}
// Start for starting to dispatching events
func (h *Manager) Start() {
h.dispatching = true
for _, event := range h.pendingEvents {
h.Call(event)
}
}
func (h *Manager) Stop() {
for i := 0; i < h.workerSize; i++ {
h.stopSig <- 0
}
h.dispatching = false
}
func (h *Manager) Register(handler *Handler) {
h.lock.Lock()
m, ok := h.handlers[handler.EventId]
// if not found, crate new handler map and assign a worker to this event id
if !ok {
m = make(map[string]*Handler)
h.handlers[handler.EventId] = m
// assign a worker to this event id
h.eventWorkerIdMap[handler.EventId] = h.currentWorkerId
h.currentWorkerId = (h.currentWorkerId + 1) % h.workerSize
}
if _, ok := m[handler.Name]; ok {
fmt.Printf("handler %s already registered, old handler is overwrittened\n", handler.Name)
}
m[handler.Name] = handler
h.lock.Unlock()
}
func (h *Manager) RegisterA(id EventId, name string, handler HandlerFunc) {
h.Register(&Handler{
EventId: id,
Name: name,
Handler: handler,
})
}
func (h *Manager) UnregisterAll() {
h.lock.Lock()
defer h.lock.Unlock()
h.handlers = make(map[EventId]map[string]*Handler)
h.currentWorkerId = 0
h.eventWorkerIdMap = make(map[EventId]int)
}
func (h *Manager) Unregister(name string) {
h.lock.Lock()
defer h.lock.Unlock()
for _, m := range h.handlers {
if _, ok := m[name]; ok {
delete(m, name)
}
}
}
func (h *Manager) Call(event *Event) {
// if not dispatching, put this event to pending events
h.lock.Lock()
if !h.dispatching {
h.pendingEvents = append(h.pendingEvents, event)
h.lock.Unlock()
return
}
handlers, ok := h.handlers[event.Id]
if !ok {
h.lock.Unlock()
return
}
workerId, ok := h.eventWorkerIdMap[event.Id]
if !ok {
// event id don't have a worker id, ignore
// maybe because this event id has no handler
h.lock.Unlock()
return
}
h.lock.Unlock()
for _, eh := range handlers {
eventHandler := eh
h.workerQueue[workerId] <- func() {
event.lock.Lock()
eventHandler.Handler(event)
event.lock.Unlock()
}
}
}
func (h *Manager) CallA(id EventId, data interface{}) {
h.Call(&Event{
Id: id,
Data: data,
})
}