mirror of
https://github.com/AynaLivePlayer/AynaLivePlayer.git
synced 2025-12-06 10:22:50 +08:00
new event bus
This commit is contained in:
@@ -10,7 +10,7 @@ type LiveRoom struct {
|
||||
LiveRoom liveroom.LiveRoom `json:"live_room"`
|
||||
Config LiveRoomConfig `json:"config"`
|
||||
Title string `json:"title"`
|
||||
Status bool `json:"-"`
|
||||
Status bool `json:"status"`
|
||||
}
|
||||
|
||||
func (r *LiveRoom) DisplayName() string {
|
||||
|
||||
9
pkg/eventbus/bridge.go
Normal file
9
pkg/eventbus/bridge.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package eventbus
|
||||
|
||||
// BusBridge is a minimal interface for a websocket-like JSON connection.
|
||||
// Implemented by e.g. *websocket.Conn from gorilla via wrappers:
|
||||
type BusBridge interface {
|
||||
ReadJSON(v any) error
|
||||
WriteJSON(v any) error
|
||||
Close() error
|
||||
}
|
||||
60
pkg/eventbus/bus.go
Normal file
60
pkg/eventbus/bus.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package eventbus
|
||||
|
||||
type Event struct {
|
||||
Id string
|
||||
// Channel if channel is empty, then event is broadcast
|
||||
Channel string
|
||||
// EchoId is used for callback, if echo is not empty
|
||||
// the caller is expecting a callback
|
||||
EchoId string
|
||||
// Data any data struct
|
||||
Data interface{}
|
||||
}
|
||||
|
||||
// HandlerFunc event handler, should be non-blocking
|
||||
type HandlerFunc func(event *Event)
|
||||
|
||||
// Subscriber is client to the bus
|
||||
type Subscriber interface {
|
||||
// Subscribe will run this handler asynchronous when an event received;
|
||||
// event will still come sequentially for each handler. which means before previous
|
||||
// event has finished, the same handler should not be called.
|
||||
// if channel is not empty, the handler will not receive event from other channel, however,
|
||||
// broadcast event (channel is empty) will still be passed to the handler
|
||||
Subscribe(channel string, eventId string, handlerName string, fn HandlerFunc) error
|
||||
// SubscribeAny is Subscribe with empty channel. this function will subscribe to event from any channel
|
||||
SubscribeAny(eventId string, handlerName string, fn HandlerFunc) error
|
||||
// SubscribeOnce will run handler once, and delete handler internally
|
||||
SubscribeOnce(eventId string, handlerName string, fn HandlerFunc) error
|
||||
// Unsubscribe just remove handler for the bus
|
||||
Unsubscribe(eventId string, handlerName string) error
|
||||
}
|
||||
|
||||
type Publisher interface {
|
||||
// Publish basically a wrapper to PublishEvent
|
||||
Publish(eventId string, data interface{}) error
|
||||
// PublishEvent publish a event
|
||||
PublishEvent(event *Event) error
|
||||
}
|
||||
|
||||
// Caller is special usage of a Publisher
|
||||
type Caller interface {
|
||||
Call(pubEvtId string, data interface{}, subEvtId string) (*Event, error)
|
||||
}
|
||||
|
||||
type Controller interface {
|
||||
// Start will start to push events to subscribers,
|
||||
// Publisher should be able to publish events before bus started
|
||||
Start() error
|
||||
// Wait will wait all event to be executed
|
||||
Wait() error
|
||||
// Stop will stop controller immediately
|
||||
Stop() error
|
||||
}
|
||||
|
||||
type Bus interface {
|
||||
Controller
|
||||
Publisher
|
||||
Subscriber
|
||||
Caller
|
||||
}
|
||||
350
pkg/eventbus/bus_impl.go
Normal file
350
pkg/eventbus/bus_impl.go
Normal file
@@ -0,0 +1,350 @@
|
||||
package eventbus
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type handlerRec struct {
|
||||
name string
|
||||
fn HandlerFunc
|
||||
once bool // if true, auto-unregister after first successful run
|
||||
channel string
|
||||
}
|
||||
|
||||
type task struct {
|
||||
ev *Event
|
||||
h handlerRec
|
||||
}
|
||||
|
||||
// bus implements Bus.
|
||||
type bus struct {
|
||||
// configuration
|
||||
workerCount int
|
||||
queueSize int
|
||||
|
||||
// workers
|
||||
queues []chan task
|
||||
wg sync.WaitGroup
|
||||
|
||||
// lifecycle
|
||||
started atomic.Bool
|
||||
stopping atomic.Bool
|
||||
stopOnce sync.Once
|
||||
stopCh chan struct{}
|
||||
drainedCh chan struct{}
|
||||
|
||||
// routing & bookkeeping
|
||||
mu sync.RWMutex
|
||||
handlers map[string]map[string]handlerRec // eventId -> handlerName -> handlerRec
|
||||
pending []*Event // events published before Start()
|
||||
|
||||
// rendezvous for Call/EchoId
|
||||
waitMu sync.Mutex
|
||||
echoWaiter map[string]chan *Event
|
||||
// simple id source for EchoId if caller doesn't provide
|
||||
idCtr atomic.Uint64
|
||||
|
||||
// logger
|
||||
log Logger
|
||||
}
|
||||
|
||||
// New creates a new Bus.
|
||||
// workerCount >= 1, queueSize >= 1.
|
||||
func New(opts ...Option) Bus {
|
||||
option := options{
|
||||
log: Log,
|
||||
workerSize: 10,
|
||||
queueSize: 100,
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(&option)
|
||||
}
|
||||
b := &bus{
|
||||
workerCount: option.workerSize,
|
||||
queueSize: option.queueSize,
|
||||
queues: make([]chan task, option.workerSize),
|
||||
stopCh: make(chan struct{}),
|
||||
drainedCh: make(chan struct{}),
|
||||
handlers: make(map[string]map[string]handlerRec),
|
||||
pending: make([]*Event, 0, 16),
|
||||
echoWaiter: make(map[string]chan *Event),
|
||||
log: option.log,
|
||||
}
|
||||
for i := 0; i < option.workerSize; i++ {
|
||||
q := make(chan task, option.queueSize)
|
||||
b.queues[i] = q
|
||||
go b.workerLoop(q)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *bus) workerLoop(q chan task) {
|
||||
for {
|
||||
select {
|
||||
case <-b.stopCh:
|
||||
// Drain quickly without executing tasks (immediate stop).
|
||||
for {
|
||||
select {
|
||||
case <-q:
|
||||
// drop
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
case t := <-q:
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
b.log.Printf("handler panic recovered: event=%s handler=%s panic=%v", t.ev.Id, t.h.name, r)
|
||||
}
|
||||
b.wg.Done()
|
||||
}()
|
||||
// Execute handler
|
||||
t.h.fn(t.ev)
|
||||
// If it was a once-handler, unregister it after execution.
|
||||
if t.h.once {
|
||||
_ = b.Unsubscribe(t.ev.Id, t.h.name)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *bus) Start() error {
|
||||
if b.started.Swap(true) {
|
||||
return nil
|
||||
}
|
||||
// Flush pending
|
||||
b.mu.Lock()
|
||||
pending := b.pending
|
||||
b.pending = nil
|
||||
b.mu.Unlock()
|
||||
|
||||
for _, ev := range pending {
|
||||
err := b.PublishEvent(ev)
|
||||
if err != nil {
|
||||
b.log.Printf("failed to publish event: %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *bus) Wait() error {
|
||||
// Wait for all in-flight tasks (that were queued before this call) to finish.
|
||||
// If Stop() has been called, Wait returns after workers exit.
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
b.wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
select {
|
||||
case <-done:
|
||||
return nil
|
||||
case <-b.drainedCh:
|
||||
// Stopped
|
||||
<-done
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (b *bus) Stop() error {
|
||||
b.stopOnce.Do(func() {
|
||||
b.stopping.Store(true)
|
||||
close(b.stopCh) // signal workers to stop immediately
|
||||
close(b.drainedCh) // allow Wait() to proceed
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *bus) Subscribe(channel string, eventId, handlerName string, fn HandlerFunc) error {
|
||||
if eventId == "" || handlerName == "" || fn == nil {
|
||||
return errors.New("invalid Subscribe args")
|
||||
}
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
m := b.handlers[eventId]
|
||||
if m == nil {
|
||||
m = make(map[string]handlerRec)
|
||||
b.handlers[eventId] = m
|
||||
}
|
||||
m[handlerName] = handlerRec{name: handlerName, fn: fn, channel: channel}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *bus) SubscribeAny(eventId, handlerName string, fn HandlerFunc) error {
|
||||
if eventId == "" || handlerName == "" || fn == nil {
|
||||
return errors.New("invalid Subscribe args")
|
||||
}
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
m := b.handlers[eventId]
|
||||
if m == nil {
|
||||
m = make(map[string]handlerRec)
|
||||
b.handlers[eventId] = m
|
||||
}
|
||||
m[handlerName] = handlerRec{name: handlerName, fn: fn, channel: ""}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *bus) SubscribeOnce(eventId, handlerName string, fn HandlerFunc) error {
|
||||
if eventId == "" || handlerName == "" || fn == nil {
|
||||
return errors.New("invalid SubscribeOnce args")
|
||||
}
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
m := b.handlers[eventId]
|
||||
if m == nil {
|
||||
m = make(map[string]handlerRec)
|
||||
b.handlers[eventId] = m
|
||||
}
|
||||
m[handlerName] = handlerRec{name: handlerName, fn: fn, once: true}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *bus) Unsubscribe(eventId, handlerName string) error {
|
||||
if eventId == "" || handlerName == "" {
|
||||
return errors.New("invalid Unsubscribe args")
|
||||
}
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if m := b.handlers[eventId]; m != nil {
|
||||
delete(m, handlerName)
|
||||
if len(m) == 0 {
|
||||
delete(b.handlers, eventId)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *bus) Publish(eventId string, data interface{}) error {
|
||||
return b.PublishEvent(&Event{Id: eventId, Data: data})
|
||||
}
|
||||
|
||||
func (b *bus) PublishEvent(ev *Event) error {
|
||||
if ev == nil || ev.Id == "" {
|
||||
return errors.New("invalid PublishEvent args")
|
||||
}
|
||||
// If stopping, drop events.
|
||||
if b.stopping.Load() {
|
||||
return errors.New("bus is stopping")
|
||||
}
|
||||
|
||||
// Rendezvous: if this looks like a reply (EchoId set) and someone is waiting, deliver.
|
||||
if ev.EchoId != "" {
|
||||
b.waitMu.Lock()
|
||||
if ch, ok := b.echoWaiter[ev.Id+ev.EchoId]; ok {
|
||||
select {
|
||||
case ch <- ev:
|
||||
default:
|
||||
}
|
||||
}
|
||||
b.waitMu.Unlock()
|
||||
}
|
||||
|
||||
b.mu.RLock()
|
||||
started := b.started.Load()
|
||||
if !started {
|
||||
// queue as pending (publish-before-start)
|
||||
b.mu.RUnlock()
|
||||
b.mu.Lock()
|
||||
if !b.started.Load() {
|
||||
b.pending = append(b.pending, cloneEvent(ev))
|
||||
b.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
// started flipped while acquiring lock, fallthrough to publish now
|
||||
b.mu.Unlock()
|
||||
b.mu.RLock()
|
||||
}
|
||||
// Snapshot handlers for this event id.
|
||||
m := b.handlers[ev.Id]
|
||||
if len(m) == 0 {
|
||||
b.mu.RUnlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Make a stable copy to avoid holding the lock during execution.
|
||||
hs := make([]handlerRec, 0, len(m))
|
||||
for _, h := range m {
|
||||
if ev.Channel != "" && h.channel != "" && ev.Channel != h.channel {
|
||||
// channel not match pass
|
||||
continue
|
||||
}
|
||||
hs = append(hs, h)
|
||||
}
|
||||
b.mu.RUnlock()
|
||||
|
||||
// Enqueue each handler on its shard (worker) based on (eventId, handlerName).
|
||||
for _, h := range hs {
|
||||
idx := shardIndex(b.workerCount, ev.Id, h.name)
|
||||
b.wg.Add(1)
|
||||
select {
|
||||
case b.queues[idx] <- task{ev: cloneEvent(ev), h: h}:
|
||||
default:
|
||||
// Backpressure: if shard queue is full, block (ensures ordering) but still bounded overall.
|
||||
b.queues[idx] <- task{ev: cloneEvent(ev), h: h}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Call publishes a request and waits for a response event with the same EchoId.
|
||||
// NOTE: Handlers should reply by publishing an Event with the SAME EchoId.
|
||||
// Use Reply helper below.
|
||||
func (b *bus) Call(eventId string, data interface{}, subEvtId string) (*Event, error) {
|
||||
if eventId == "" {
|
||||
return nil, errors.New("empty eventId")
|
||||
}
|
||||
echo := b.nextEchoId()
|
||||
wait := make(chan *Event, 1)
|
||||
|
||||
b.waitMu.Lock()
|
||||
b.echoWaiter[subEvtId+echo] = wait
|
||||
b.waitMu.Unlock()
|
||||
defer func() {
|
||||
b.waitMu.Lock()
|
||||
delete(b.echoWaiter, subEvtId+echo)
|
||||
b.waitMu.Unlock()
|
||||
}()
|
||||
|
||||
b.PublishEvent(&Event{Id: eventId, EchoId: echo, Data: data})
|
||||
|
||||
timeout := time.After(6 * time.Second)
|
||||
|
||||
// No timeout specified in interface; block until reply or stop.
|
||||
select {
|
||||
case resp := <-wait:
|
||||
return resp, nil
|
||||
case <-timeout:
|
||||
return nil, errors.New("call timeout")
|
||||
case <-b.drainedCh:
|
||||
return nil, errors.New("bus stopped")
|
||||
}
|
||||
}
|
||||
|
||||
func (b *bus) nextEchoId() string {
|
||||
x := b.idCtr.Add(1)
|
||||
return fmt.Sprintf("echo-%d-%d", time.Now().UnixNano(), x)
|
||||
}
|
||||
|
||||
func shardIndex(n int, eventId, handlerName string) int {
|
||||
h := fnv.New32a()
|
||||
_, _ = h.Write([]byte(eventId))
|
||||
_, _ = h.Write([]byte{0})
|
||||
_, _ = h.Write([]byte(handlerName))
|
||||
return int(h.Sum32() % uint32(n))
|
||||
}
|
||||
|
||||
func cloneEvent(e *Event) *Event {
|
||||
if e == nil {
|
||||
return nil
|
||||
}
|
||||
// shallow clone is fine; Data is user-owned
|
||||
cp := *e
|
||||
return &cp
|
||||
}
|
||||
398
pkg/eventbus/bus_impl_test.go
Normal file
398
pkg/eventbus/bus_impl_test.go
Normal file
@@ -0,0 +1,398 @@
|
||||
// generated by chatgpt
|
||||
package eventbus
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestBasicLifecycle verifies the fundamental Start, Stop, and Wait operations.
|
||||
func TestBasicLifecycle(t *testing.T) {
|
||||
b := New(WithWorkerSize(2), WithQueueSize(10))
|
||||
|
||||
// Start should only work once.
|
||||
err := b.Start()
|
||||
require.NoError(t, err)
|
||||
err = b.Start()
|
||||
require.NoError(t, err) // Subsequent starts should be no-ops.
|
||||
|
||||
// Stop should work.
|
||||
err = b.Stop()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait should not block after stop.
|
||||
err = b.Wait()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// TestSubscribeAndPublish verifies the core functionality of publishing an event
|
||||
// and having a subscriber receive it.
|
||||
func TestSubscribeAndPublish(t *testing.T) {
|
||||
b := New(WithWorkerSize(1), WithQueueSize(10))
|
||||
err := b.Start()
|
||||
require.NoError(t, err)
|
||||
defer b.Stop()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
||||
receivedData := new(atomic.Value)
|
||||
|
||||
handler := func(event *Event) {
|
||||
receivedData.Store(event.Data)
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
err = b.Subscribe("", "test-event", "test-handler", handler)
|
||||
require.NoError(t, err)
|
||||
|
||||
b.Publish("test-event", "hello world")
|
||||
|
||||
wg.Wait()
|
||||
require.Equal(t, "hello world", receivedData.Load())
|
||||
}
|
||||
|
||||
// TestUnsubscribe ensures that a handler stops receiving events after unsubscribing.
|
||||
func TestUnsubscribe(t *testing.T) {
|
||||
b := New(WithWorkerSize(2), WithQueueSize(10))
|
||||
b.Start()
|
||||
defer b.Stop()
|
||||
|
||||
var callCount int32
|
||||
handler := func(event *Event) {
|
||||
atomic.AddInt32(&callCount, 1)
|
||||
}
|
||||
|
||||
err := b.Subscribe("", "event-A", "handler-1", handler)
|
||||
require.NoError(t, err)
|
||||
|
||||
b.Publish("event-A", nil)
|
||||
b.Wait()
|
||||
require.Equal(t, int32(1), atomic.LoadInt32(&callCount))
|
||||
|
||||
// Unsubscribe
|
||||
err = b.Unsubscribe("event-A", "handler-1")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Publish again
|
||||
b.Publish("event-A", nil)
|
||||
b.Wait() // Give it a moment to ensure it's not processed
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
require.Equal(t, int32(1), atomic.LoadInt32(&callCount), "Handler should not be called after unsubscribing")
|
||||
}
|
||||
|
||||
// TestSubscribeOnce verifies that a handler subscribed with SubscribeOnce
|
||||
// is only called once and then automatically removed.
|
||||
func TestSubscribeOnce(t *testing.T) {
|
||||
b := New(WithWorkerSize(1), WithQueueSize(10))
|
||||
b.Start()
|
||||
defer b.Stop()
|
||||
|
||||
var callCount int32
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
||||
handler := func(event *Event) {
|
||||
atomic.AddInt32(&callCount, 1)
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
err := b.SubscribeOnce("event-once", "handler-once", handler)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Publish twice
|
||||
b.Publish("event-once", "data1")
|
||||
wg.Wait() // Wait for the first event to be processed
|
||||
|
||||
b.Publish("event-once", "data2")
|
||||
b.Wait()
|
||||
time.Sleep(50 * time.Millisecond) // Ensure no more events are processed
|
||||
|
||||
require.Equal(t, int32(1), atomic.LoadInt32(&callCount))
|
||||
}
|
||||
|
||||
// TestChannelSubscription validates that handlers correctly receive events based on channel matching.
|
||||
func TestChannelSubscription(t *testing.T) {
|
||||
b := New(WithWorkerSize(2), WithQueueSize(20))
|
||||
b.Start()
|
||||
defer b.Stop()
|
||||
|
||||
var receivedMu sync.Mutex
|
||||
received := make(map[string]int)
|
||||
|
||||
// Handler for a specific channel
|
||||
err := b.Subscribe("ch1", "event-X", "handler-ch1", func(event *Event) {
|
||||
receivedMu.Lock()
|
||||
received["handler-ch1"]++
|
||||
receivedMu.Unlock()
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Handler for another channel
|
||||
err = b.Subscribe("ch2", "event-X", "handler-ch2", func(event *Event) {
|
||||
receivedMu.Lock()
|
||||
received["handler-ch2"]++
|
||||
receivedMu.Unlock()
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Handler for any channel (broadcast)
|
||||
err = b.SubscribeAny("event-X", "handler-any", func(event *Event) {
|
||||
receivedMu.Lock()
|
||||
received["handler-any"]++
|
||||
receivedMu.Unlock()
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// 1. Publish to ch1
|
||||
b.PublishEvent(&Event{Id: "event-X", Channel: "ch1"})
|
||||
b.Wait()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
receivedMu.Lock()
|
||||
require.Equal(t, 1, received["handler-ch1"])
|
||||
require.Equal(t, 0, received["handler-ch2"])
|
||||
require.Equal(t, 1, received["handler-any"])
|
||||
receivedMu.Unlock()
|
||||
|
||||
// 2. Publish to ch2
|
||||
b.PublishEvent(&Event{Id: "event-X", Channel: "ch2"})
|
||||
b.Wait()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
receivedMu.Lock()
|
||||
require.Equal(t, 1, received["handler-ch1"])
|
||||
require.Equal(t, 1, received["handler-ch2"])
|
||||
require.Equal(t, 2, received["handler-any"])
|
||||
receivedMu.Unlock()
|
||||
|
||||
// 3. Publish broadcast (empty channel)
|
||||
b.PublishEvent(&Event{Id: "event-X", Channel: ""})
|
||||
b.Wait()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
receivedMu.Lock()
|
||||
// All handlers should receive broadcast events
|
||||
require.Equal(t, 2, received["handler-ch1"])
|
||||
require.Equal(t, 2, received["handler-ch2"])
|
||||
require.Equal(t, 3, received["handler-any"])
|
||||
receivedMu.Unlock()
|
||||
}
|
||||
|
||||
// TestPublishBeforeStart ensures that events published before the bus starts are queued
|
||||
// and processed after Start() is called.
|
||||
func TestPublishBeforeStart(t *testing.T) {
|
||||
b := New(WithWorkerSize(1), WithQueueSize(10))
|
||||
|
||||
var receivedCount int32
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
|
||||
handler := func(event *Event) {
|
||||
atomic.AddInt32(&receivedCount, 1)
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
err := b.Subscribe("", "pending-event", "handler", handler)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Publish before start
|
||||
b.Publish("pending-event", "data1")
|
||||
b.Publish("pending-event", "data2")
|
||||
|
||||
// Handler should not have been called yet
|
||||
require.Equal(t, int32(0), atomic.LoadInt32(&receivedCount))
|
||||
|
||||
// Now start the bus
|
||||
b.Start()
|
||||
defer b.Stop()
|
||||
|
||||
wg.Wait()
|
||||
require.Equal(t, int32(2), atomic.LoadInt32(&receivedCount))
|
||||
}
|
||||
|
||||
// TestCall validates the request-response pattern using the Call method.
|
||||
func TestCall(t *testing.T) {
|
||||
b := New(WithWorkerSize(2), WithQueueSize(10))
|
||||
b.Start()
|
||||
defer b.Stop()
|
||||
|
||||
// Responder handler
|
||||
responder := func(event *Event) {
|
||||
require.NotEmpty(t, event.EchoId)
|
||||
// Respond with a different event ID, but the same EchoId
|
||||
b.PublishEvent(&Event{
|
||||
Id: "response-event",
|
||||
EchoId: event.EchoId,
|
||||
Data: fmt.Sprintf("response to %v", event.Data),
|
||||
})
|
||||
}
|
||||
|
||||
err := b.Subscribe("", "request-event", "responder-handler", responder)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Make the call
|
||||
resp, err := b.Call("request-event", "my-data", "response-event")
|
||||
|
||||
// Verify response
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
require.Equal(t, "response-event", resp.Id)
|
||||
require.Equal(t, "response to my-data", resp.Data)
|
||||
}
|
||||
|
||||
// TestCall_StopDuringWait checks that Call returns an error if the bus is stopped while waiting.
|
||||
func TestCall_StopDuringWait(t *testing.T) {
|
||||
b := New(WithWorkerSize(1), WithQueueSize(10))
|
||||
b.Start()
|
||||
|
||||
var callErr error
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// This call will never get a response
|
||||
_, callErr = b.Call("no-reply-event", nil, "no-reply-response")
|
||||
}()
|
||||
|
||||
// Give the goroutine time to start waiting
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
b.Stop() // Stop the bus
|
||||
wg.Wait()
|
||||
|
||||
require.Error(t, callErr)
|
||||
require.Contains(t, callErr.Error(), "bus stopped")
|
||||
}
|
||||
|
||||
// TestPanicRecovery ensures that a panicking handler does not crash the worker.
|
||||
func TestPanicRecovery(t *testing.T) {
|
||||
b := New(WithWorkerSize(1), WithQueueSize(10))
|
||||
b.Start()
|
||||
defer b.Stop()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2) // One for the panic, one for the healthy one
|
||||
|
||||
panicHandler := func(event *Event) {
|
||||
defer wg.Done()
|
||||
if event.Data == "panic" {
|
||||
panic("handler intended panic")
|
||||
}
|
||||
}
|
||||
|
||||
healthyHandler := func(event *Event) {
|
||||
defer wg.Done()
|
||||
// This should still run
|
||||
}
|
||||
|
||||
err := b.Subscribe("", "panic-event", "panic-handler", panicHandler)
|
||||
require.NoError(t, err)
|
||||
err = b.Subscribe("", "panic-event", "healthy-handler", healthyHandler)
|
||||
require.NoError(t, err)
|
||||
|
||||
// This should not crash the test
|
||||
b.Publish("panic-event", "panic")
|
||||
|
||||
wg.Wait() // Will complete if both handlers finish (one by panicking, one normally)
|
||||
require.True(t, true, "Test completed, indicating panic was recovered")
|
||||
}
|
||||
|
||||
// TestConcurrency runs many operations in parallel to check for race conditions.
|
||||
func TestConcurrency(t *testing.T) {
|
||||
workerCount := 4
|
||||
queueSize := 50
|
||||
b := New(WithWorkerSize(workerCount), WithQueueSize(queueSize))
|
||||
b.Start()
|
||||
defer b.Stop()
|
||||
|
||||
numGoroutines := 50
|
||||
eventsPerGoRoutine := 100
|
||||
var totalEventsPublished int32
|
||||
var totalEventsReceived int32
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Subscriber goroutines
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
eventId := fmt.Sprintf("concurrent-event-%d", i%10) // 10 different events
|
||||
handlerId := fmt.Sprintf("handler-%d", i)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
handler := func(e *Event) {
|
||||
atomic.AddInt32(&totalEventsReceived, 1)
|
||||
}
|
||||
err := b.Subscribe("", eventId, handlerId, handler)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
err = b.Unsubscribe(eventId, handlerId)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
}
|
||||
|
||||
// Setup a persistent handler to count received events
|
||||
persistentHandler := func(e *Event) {
|
||||
atomic.AddInt32(&totalEventsReceived, 1)
|
||||
}
|
||||
|
||||
for i := 0; i < 20; i++ {
|
||||
eventId := fmt.Sprintf("event-for-%d", i)
|
||||
err := b.Subscribe("", eventId, "persistent-handler", persistentHandler)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Publisher goroutines
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
wg.Add(1)
|
||||
go func(gId int) {
|
||||
defer wg.Done()
|
||||
for j := 0; j < eventsPerGoRoutine; j++ {
|
||||
eventId := fmt.Sprintf("event-for-%d", (gId+j)%20)
|
||||
b.Publish(eventId, gId)
|
||||
atomic.AddInt32(&totalEventsPublished, 1)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
b.Wait() // Wait for all published events to be processed
|
||||
|
||||
fmt.Printf("Published: %d, Received: %d\n", totalEventsPublished, totalEventsReceived)
|
||||
// We check that the number of received events matches published events for persistent handlers
|
||||
require.Equal(t, atomic.LoadInt32(&totalEventsPublished), atomic.LoadInt32(&totalEventsReceived))
|
||||
}
|
||||
|
||||
// TestInvalidArguments checks that API methods return errors on invalid input.
|
||||
func TestInvalidArguments(t *testing.T) {
|
||||
b := New(WithWorkerSize(1), WithQueueSize(1))
|
||||
|
||||
// Subscribe
|
||||
err := b.Subscribe("", "", "name", func(e *Event) {})
|
||||
require.Error(t, err, "Subscribe should error on empty eventId")
|
||||
err = b.Subscribe("", "id", "", func(e *Event) {})
|
||||
require.Error(t, err, "Subscribe should error on empty handlerName")
|
||||
err = b.Subscribe("", "id", "name", nil)
|
||||
require.Error(t, err, "Subscribe should error on nil handler func")
|
||||
|
||||
// SubscribeAny
|
||||
err = b.SubscribeAny("", "name", func(e *Event) {})
|
||||
require.Error(t, err, "SubscribeAny should error on empty eventId")
|
||||
|
||||
// SubscribeOnce
|
||||
err = b.SubscribeOnce("", "name", func(e *Event) {})
|
||||
require.Error(t, err, "SubscribeOnce should error on empty eventId")
|
||||
|
||||
// Unsubscribe
|
||||
err = b.Unsubscribe("", "name")
|
||||
require.Error(t, err, "Unsubscribe should error on empty eventId")
|
||||
err = b.Unsubscribe("id", "")
|
||||
require.Error(t, err, "Unsubscribe should error on empty handlerName")
|
||||
|
||||
// Call
|
||||
_, err = b.Call("", nil, "subID")
|
||||
require.Error(t, err, "Call should error on empty eventId")
|
||||
}
|
||||
65
pkg/eventbus/events.go
Normal file
65
pkg/eventbus/events.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package eventbus
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
var DefaultMapper = NewEventsMapper()
|
||||
|
||||
func UnmarshalEvent(data []byte) (*Event, error) {
|
||||
return DefaultMapper.UnmarshalEvent(data)
|
||||
}
|
||||
|
||||
func UnmarshalEventData(eventId string, data []byte) (any, error) {
|
||||
return DefaultMapper.UnmarshalEventData(eventId, data)
|
||||
}
|
||||
|
||||
type EventsMapper struct {
|
||||
Mapping map[string]any
|
||||
}
|
||||
|
||||
func NewEventsMapper() *EventsMapper {
|
||||
return &EventsMapper{
|
||||
Mapping: make(map[string]any),
|
||||
}
|
||||
}
|
||||
|
||||
type untypedEvent struct {
|
||||
Id string
|
||||
Channel string
|
||||
EchoId string
|
||||
Data json.RawMessage
|
||||
}
|
||||
|
||||
func (m *EventsMapper) UnmarshalEvent(data []byte) (*Event, error) {
|
||||
var val untypedEvent
|
||||
err := json.Unmarshal(data, &val)
|
||||
if err != nil {
|
||||
return nil, errors.New("failed to unmarshal event: " + err.Error())
|
||||
}
|
||||
actualEventData, err := m.UnmarshalEventData(val.Id, val.Data)
|
||||
if err != nil {
|
||||
return nil, errors.New("failed to unmarshal event data: " + err.Error())
|
||||
}
|
||||
return &Event{
|
||||
Id: val.Id,
|
||||
Channel: val.Channel,
|
||||
EchoId: val.EchoId,
|
||||
Data: actualEventData,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *EventsMapper) UnmarshalEventData(eventId string, data []byte) (any, error) {
|
||||
val, ok := m.Mapping[eventId]
|
||||
if !ok {
|
||||
return nil, errors.New("event id not found")
|
||||
}
|
||||
newVal := reflect.New(reflect.TypeOf(val))
|
||||
err := json.Unmarshal(data, newVal.Interface())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newVal.Elem().Interface(), nil
|
||||
}
|
||||
11
pkg/eventbus/go.mod
Normal file
11
pkg/eventbus/go.mod
Normal file
@@ -0,0 +1,11 @@
|
||||
module eventbus
|
||||
|
||||
go 1.24.4
|
||||
|
||||
require github.com/stretchr/testify v1.11.1
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
16
pkg/eventbus/logger.go
Normal file
16
pkg/eventbus/logger.go
Normal file
@@ -0,0 +1,16 @@
|
||||
package eventbus
|
||||
|
||||
import "log"
|
||||
|
||||
type Logger interface {
|
||||
Printf(string, ...interface{})
|
||||
}
|
||||
|
||||
type loggerImpl struct{}
|
||||
|
||||
func (l loggerImpl) Printf(s string, i ...interface{}) {
|
||||
log.Printf(s, i...)
|
||||
}
|
||||
|
||||
// Log replace with your own logger if needed
|
||||
var Log Logger = &loggerImpl{}
|
||||
29
pkg/eventbus/options.go
Normal file
29
pkg/eventbus/options.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package eventbus
|
||||
|
||||
type options struct {
|
||||
log Logger
|
||||
workerSize int
|
||||
queueSize int
|
||||
}
|
||||
|
||||
type Option func(*options)
|
||||
|
||||
func WithLogger(logger Logger) Option {
|
||||
return func(o *options) { o.log = logger }
|
||||
}
|
||||
|
||||
func WithWorkerSize(workerSize int) Option {
|
||||
return func(o *options) {
|
||||
if workerSize >= 1 {
|
||||
o.workerSize = workerSize
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func WithQueueSize(queueSize int) Option {
|
||||
return func(o *options) {
|
||||
if queueSize >= 1 {
|
||||
o.queueSize = queueSize
|
||||
}
|
||||
}
|
||||
}
|
||||
10
pkg/eventbus/readme.md
Normal file
10
pkg/eventbus/readme.md
Normal file
@@ -0,0 +1,10 @@
|
||||
# eventbus
|
||||
|
||||
rewrite version of original event manager in AynaLivePlayer
|
||||
|
||||
inspired by [asaskevich/EventBus](https://github.com/asaskevich/EventBus)
|
||||
|
||||
|
||||
# todo
|
||||
|
||||
- custom marshaller
|
||||
Submodule pkg/miaosic updated: dd6ffb0546...f834cca698
Reference in New Issue
Block a user