Files
AynaLivePlayer/pkg/eventbus/bus_impl.go
2026-02-19 11:10:42 +08:00

419 lines
9.9 KiB
Go

package eventbus
import (
"errors"
"fmt"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
type handlerRec struct {
name string
fn HandlerFunc
once bool // if true, auto-unregister after first successful run
channel string
}
type task struct {
ev *Event
h handlerRec
}
// bus implements Bus.
type bus struct {
// configuration
maxWorkerSize int
queueSize int
// workers
queues []chan task
wg sync.WaitGroup
// lifecycle
started atomic.Bool
stopping atomic.Bool
stopOnce sync.Once
stopCh chan struct{}
drainedCh chan struct{}
// routing & bookkeeping
mu sync.RWMutex
handlers map[string]map[string]handlerRec // eventId -> handlerName -> handlerRec
workerIdxes map[string]int // eventId -> workerIdx
pending []*Event // events published before Start()
// rendezvous for Call/EchoId
waitMu sync.Mutex
echoWaiter map[string]chan *Event
// simple id source for EchoId if caller doesn't provide
idCtr atomic.Uint64
// logger
log Logger
// worker context markers for deadlock detection in Call()
workerCtxMu sync.RWMutex
workerCtx map[uint64]int // goroutine id -> worker idx
}
// New creates a new Bus.
// workerCount >= 1, queueSize >= 1.
func New(opts ...Option) Bus {
option := options{
log: Log,
maxWorkerSize: 10,
queueSize: 100,
}
for _, opt := range opts {
opt(&option)
}
b := &bus{
maxWorkerSize: option.maxWorkerSize,
queueSize: option.queueSize,
queues: make([]chan task, 0, option.maxWorkerSize),
stopCh: make(chan struct{}),
drainedCh: make(chan struct{}),
handlers: make(map[string]map[string]handlerRec),
workerIdxes: make(map[string]int),
pending: make([]*Event, 0, 16),
echoWaiter: make(map[string]chan *Event),
log: option.log,
workerCtx: make(map[uint64]int),
}
for i := 0; i < option.maxWorkerSize; i++ {
b.addWorker(i)
}
return b
}
func (b *bus) addWorker(workerIdx int) {
b.mu.Lock()
q := make(chan task, b.queueSize)
b.queues = append(b.queues, q)
go b.workerLoop(workerIdx, q)
b.mu.Unlock()
}
func (b *bus) workerLoop(workerIdx int, q chan task) {
for {
select {
case <-b.stopCh:
// Drain quickly without executing tasks (immediate stop).
for {
select {
case <-q:
// drop
default:
return
}
}
case t := <-q:
func() {
gid := curGID()
b.workerCtxMu.Lock()
b.workerCtx[gid] = workerIdx
b.workerCtxMu.Unlock()
defer func() {
b.workerCtxMu.Lock()
delete(b.workerCtx, gid)
b.workerCtxMu.Unlock()
if r := recover(); r != nil {
b.log.Printf("handler panic recovered: event=%s handler=%s panic=%v", t.ev.Id, t.h.name, r)
}
b.wg.Done()
}()
// Execute handler
t.h.fn(t.ev)
// If it was a once-handler, unregister it after execution.
if t.h.once {
_ = b.Unsubscribe(t.ev.Id, t.h.name)
}
}()
}
}
}
func (b *bus) Start() error {
if b.started.Swap(true) {
return nil
}
// Flush pending
b.mu.Lock()
pending := b.pending
b.pending = nil
b.mu.Unlock()
for _, ev := range pending {
err := b.PublishEvent(ev)
if err != nil {
b.log.Printf("failed to publish event: %v", err)
}
}
return nil
}
func (b *bus) Wait() error {
// Wait for all in-flight tasks (that were queued before this call) to finish.
// If Stop() has been called, Wait returns after workers exit.
done := make(chan struct{})
go func() {
b.wg.Wait()
close(done)
}()
select {
case <-done:
return nil
}
}
func (b *bus) Stop() error {
b.stopOnce.Do(func() {
b.stopping.Store(true)
close(b.stopCh) // signal workers to stop immediately
})
return nil
}
func (b *bus) Subscribe(channel string, eventId, handlerName string, fn HandlerFunc) error {
if eventId == "" || handlerName == "" || fn == nil {
return errors.New("invalid Subscribe args")
}
b.mu.Lock()
defer b.mu.Unlock()
m := b.handlers[eventId]
if m == nil {
m = make(map[string]handlerRec)
b.handlers[eventId] = m
b.workerIdxes[eventId] = len(b.workerIdxes) % b.maxWorkerSize // assign a worker index for this eventId
}
m[handlerName] = handlerRec{name: handlerName, fn: fn, channel: channel}
return nil
}
func (b *bus) SubscribeAny(eventId, handlerName string, fn HandlerFunc) error {
return b.Subscribe("", eventId, handlerName, fn)
}
func (b *bus) SubscribeOnce(channel, eventId, handlerName string, fn HandlerFunc) error {
if eventId == "" || handlerName == "" || fn == nil {
return errors.New("invalid SubscribeOnce args")
}
b.mu.Lock()
defer b.mu.Unlock()
m := b.handlers[eventId]
if m == nil {
m = make(map[string]handlerRec)
b.handlers[eventId] = m
b.workerIdxes[eventId] = len(b.workerIdxes) % b.maxWorkerSize // assign a worker index for this eventId
}
m[handlerName] = handlerRec{channel: channel, name: handlerName, fn: fn, once: true}
return nil
}
func (b *bus) Unsubscribe(eventId, handlerName string) error {
if eventId == "" || handlerName == "" {
return errors.New("invalid Unsubscribe args")
}
b.mu.Lock()
defer b.mu.Unlock()
if m := b.handlers[eventId]; m != nil {
delete(m, handlerName)
if len(m) == 0 {
delete(b.handlers, eventId)
}
}
return nil
}
func (b *bus) Publish(eventId string, data interface{}) error {
return b.PublishEvent(&Event{Id: eventId, Data: data})
}
func (b *bus) PublishToChannel(channel string, eventId string, data interface{}) error {
return b.PublishEvent(&Event{Id: eventId, Channel: channel, Data: data})
}
func (b *bus) PublishEvent(ev *Event) error {
if ev == nil || ev.Id == "" {
return errors.New("invalid PublishEvent args")
}
// If stopping, drop events.
if b.stopping.Load() {
return errors.New("bus is stopping")
}
// Rendezvous: if this looks like a reply (EchoId set) and someone is waiting, deliver.
if ev.EchoId != "" {
b.waitMu.Lock()
if ch, ok := b.echoWaiter[ev.Id+ev.EchoId]; ok {
select {
case ch <- ev:
default:
}
// in this case, we found this event belong to local call
// so we don't need to dispatch this event to other subscriber
b.waitMu.Unlock()
return nil
}
b.waitMu.Unlock()
}
b.mu.RLock()
started := b.started.Load()
if !started {
// queue as pending (publish-before-start)
b.mu.RUnlock()
b.mu.Lock()
if !b.started.Load() {
b.pending = append(b.pending, cloneEvent(ev))
b.mu.Unlock()
return nil
}
// started flipped while acquiring lock, fallthrough to publish now
b.mu.Unlock()
b.mu.RLock()
}
// Snapshot handlers for this event id.
m := b.handlers[ev.Id]
if len(m) == 0 {
b.mu.RUnlock()
return nil
}
// Make a stable copy to avoid holding the lock during execution.
hs := make([]handlerRec, 0, len(m))
for _, h := range m {
if ev.Channel != "" && h.channel != "" && ev.Channel != h.channel {
// channel not match pass
continue
}
hs = append(hs, h)
}
b.mu.RUnlock()
// Enqueue each handler on its shard (worker) based on (eventId, handlerName).
for _, h := range hs {
idx := b.shardIndex(ev.Id, h.name)
b.wg.Add(1)
select {
case b.queues[idx] <- task{ev: cloneEvent(ev), h: h}:
default:
// Backpressure: if shard queue is full, block (ensures ordering) but still bounded overall.
b.queues[idx] <- task{ev: cloneEvent(ev), h: h}
}
}
return nil
}
// Call publishes a request and waits for a response event with the same EchoId.
// NOTE: Handlers should reply by publishing an Event with the SAME EchoId.
// Use Reply helper below.
func (b *bus) Call(eventId string, subEvtId string, data interface{}) (*Event, error) {
if eventId == "" {
return nil, errors.New("empty eventId")
}
if b.willDeadlockOnCall(eventId) {
return nil, fmt.Errorf("potential deadlock detected: sync Call(%s) from same worker shard", eventId)
}
echo := b.nextEchoId()
wait := make(chan *Event, 1)
b.waitMu.Lock()
b.echoWaiter[subEvtId+echo] = wait
b.waitMu.Unlock()
defer func() {
b.waitMu.Lock()
delete(b.echoWaiter, subEvtId+echo)
b.waitMu.Unlock()
}()
b.PublishEvent(&Event{Id: eventId, EchoId: echo, Data: data})
timeout := time.After(6 * time.Second)
// No timeout specified in interface; block until reply or stop.
select {
case resp := <-wait:
return resp, nil
case <-timeout:
return nil, errors.New("call timeout")
case <-b.stopCh:
return nil, errors.New("bus stopped")
}
}
func (b *bus) willDeadlockOnCall(eventId string) bool {
gid := curGID()
b.workerCtxMu.RLock()
currentWorker, inWorker := b.workerCtx[gid]
b.workerCtxMu.RUnlock()
if !inWorker {
return false
}
b.mu.RLock()
targetWorker, hasWorker := b.workerIdxes[eventId]
b.mu.RUnlock()
if !hasWorker {
return false
}
return currentWorker == targetWorker
}
func curGID() uint64 {
var buf [64]byte
n := runtime.Stack(buf[:], false)
// first line format: "goroutine 123 [running]:\n"
line := strings.TrimPrefix(string(buf[:n]), "goroutine ")
space := strings.IndexByte(line, ' ')
if space <= 0 {
return 0
}
id, err := strconv.ParseUint(line[:space], 10, 64)
if err != nil {
return 0
}
return id
}
func (b *bus) Reply(req *Event, eventId string, data interface{}) error {
return b.PublishEvent(&Event{
Id: eventId,
Channel: req.Channel,
EchoId: req.EchoId,
Data: data,
})
}
func (b *bus) nextEchoId() string {
x := b.idCtr.Add(1)
return fmt.Sprintf("echo-%d-%d", time.Now().UnixNano(), x)
}
func (b *bus) shardIndex(eventId, handlerName string) int {
val, _ := b.workerIdxes[eventId]
return val
// what if two different eventId and handlerName produce same shard index?
// and one handler happens to call another event synchronously?
// well, in that case, the second event will be blocked until the first one finishes
// which cause deadlock if the first one is waiting for the second one to finish
//h := fnv.New32a()
//_, _ = h.Write([]byte(eventId))
//_, _ = h.Write([]byte{0})
//_, _ = h.Write([]byte(handlerName))
//return int(h.Sum32() % uint32(n))
}
func cloneEvent(e *Event) *Event {
if e == nil {
return nil
}
// shallow clone is fine; Data is user-owned
cp := *e
return &cp
}