From a26b58b0836bff616b0a11d0ffbdf472f540e76e Mon Sep 17 00:00:00 2001 From: aynakeya Date: Tue, 2 Sep 2025 14:31:17 +0800 Subject: [PATCH] new event bus --- core/model/liveroom.go | 2 +- pkg/eventbus/bridge.go | 9 + pkg/eventbus/bus.go | 60 +++++ pkg/eventbus/bus_impl.go | 350 ++++++++++++++++++++++++++++++ pkg/eventbus/bus_impl_test.go | 398 ++++++++++++++++++++++++++++++++++ pkg/eventbus/events.go | 65 ++++++ pkg/eventbus/go.mod | 11 + pkg/eventbus/logger.go | 16 ++ pkg/eventbus/options.go | 29 +++ pkg/eventbus/readme.md | 10 + pkg/miaosic | 2 +- 11 files changed, 950 insertions(+), 2 deletions(-) create mode 100644 pkg/eventbus/bridge.go create mode 100644 pkg/eventbus/bus.go create mode 100644 pkg/eventbus/bus_impl.go create mode 100644 pkg/eventbus/bus_impl_test.go create mode 100644 pkg/eventbus/events.go create mode 100644 pkg/eventbus/go.mod create mode 100644 pkg/eventbus/logger.go create mode 100644 pkg/eventbus/options.go create mode 100644 pkg/eventbus/readme.md diff --git a/core/model/liveroom.go b/core/model/liveroom.go index 296d161..91519aa 100644 --- a/core/model/liveroom.go +++ b/core/model/liveroom.go @@ -10,7 +10,7 @@ type LiveRoom struct { LiveRoom liveroom.LiveRoom `json:"live_room"` Config LiveRoomConfig `json:"config"` Title string `json:"title"` - Status bool `json:"-"` + Status bool `json:"status"` } func (r *LiveRoom) DisplayName() string { diff --git a/pkg/eventbus/bridge.go b/pkg/eventbus/bridge.go new file mode 100644 index 0000000..1eded0d --- /dev/null +++ b/pkg/eventbus/bridge.go @@ -0,0 +1,9 @@ +package eventbus + +// BusBridge is a minimal interface for a websocket-like JSON connection. +// Implemented by e.g. *websocket.Conn from gorilla via wrappers: +type BusBridge interface { + ReadJSON(v any) error + WriteJSON(v any) error + Close() error +} diff --git a/pkg/eventbus/bus.go b/pkg/eventbus/bus.go new file mode 100644 index 0000000..787cfec --- /dev/null +++ b/pkg/eventbus/bus.go @@ -0,0 +1,60 @@ +package eventbus + +type Event struct { + Id string + // Channel if channel is empty, then event is broadcast + Channel string + // EchoId is used for callback, if echo is not empty + // the caller is expecting a callback + EchoId string + // Data any data struct + Data interface{} +} + +// HandlerFunc event handler, should be non-blocking +type HandlerFunc func(event *Event) + +// Subscriber is client to the bus +type Subscriber interface { + // Subscribe will run this handler asynchronous when an event received; + // event will still come sequentially for each handler. which means before previous + // event has finished, the same handler should not be called. + // if channel is not empty, the handler will not receive event from other channel, however, + // broadcast event (channel is empty) will still be passed to the handler + Subscribe(channel string, eventId string, handlerName string, fn HandlerFunc) error + // SubscribeAny is Subscribe with empty channel. this function will subscribe to event from any channel + SubscribeAny(eventId string, handlerName string, fn HandlerFunc) error + // SubscribeOnce will run handler once, and delete handler internally + SubscribeOnce(eventId string, handlerName string, fn HandlerFunc) error + // Unsubscribe just remove handler for the bus + Unsubscribe(eventId string, handlerName string) error +} + +type Publisher interface { + // Publish basically a wrapper to PublishEvent + Publish(eventId string, data interface{}) error + // PublishEvent publish a event + PublishEvent(event *Event) error +} + +// Caller is special usage of a Publisher +type Caller interface { + Call(pubEvtId string, data interface{}, subEvtId string) (*Event, error) +} + +type Controller interface { + // Start will start to push events to subscribers, + // Publisher should be able to publish events before bus started + Start() error + // Wait will wait all event to be executed + Wait() error + // Stop will stop controller immediately + Stop() error +} + +type Bus interface { + Controller + Publisher + Subscriber + Caller +} diff --git a/pkg/eventbus/bus_impl.go b/pkg/eventbus/bus_impl.go new file mode 100644 index 0000000..a8c0232 --- /dev/null +++ b/pkg/eventbus/bus_impl.go @@ -0,0 +1,350 @@ +package eventbus + +import ( + "errors" + "fmt" + "hash/fnv" + "sync" + "sync/atomic" + "time" +) + +type handlerRec struct { + name string + fn HandlerFunc + once bool // if true, auto-unregister after first successful run + channel string +} + +type task struct { + ev *Event + h handlerRec +} + +// bus implements Bus. +type bus struct { + // configuration + workerCount int + queueSize int + + // workers + queues []chan task + wg sync.WaitGroup + + // lifecycle + started atomic.Bool + stopping atomic.Bool + stopOnce sync.Once + stopCh chan struct{} + drainedCh chan struct{} + + // routing & bookkeeping + mu sync.RWMutex + handlers map[string]map[string]handlerRec // eventId -> handlerName -> handlerRec + pending []*Event // events published before Start() + + // rendezvous for Call/EchoId + waitMu sync.Mutex + echoWaiter map[string]chan *Event + // simple id source for EchoId if caller doesn't provide + idCtr atomic.Uint64 + + // logger + log Logger +} + +// New creates a new Bus. +// workerCount >= 1, queueSize >= 1. +func New(opts ...Option) Bus { + option := options{ + log: Log, + workerSize: 10, + queueSize: 100, + } + for _, opt := range opts { + opt(&option) + } + b := &bus{ + workerCount: option.workerSize, + queueSize: option.queueSize, + queues: make([]chan task, option.workerSize), + stopCh: make(chan struct{}), + drainedCh: make(chan struct{}), + handlers: make(map[string]map[string]handlerRec), + pending: make([]*Event, 0, 16), + echoWaiter: make(map[string]chan *Event), + log: option.log, + } + for i := 0; i < option.workerSize; i++ { + q := make(chan task, option.queueSize) + b.queues[i] = q + go b.workerLoop(q) + } + return b +} + +func (b *bus) workerLoop(q chan task) { + for { + select { + case <-b.stopCh: + // Drain quickly without executing tasks (immediate stop). + for { + select { + case <-q: + // drop + default: + return + } + } + case t := <-q: + func() { + defer func() { + if r := recover(); r != nil { + b.log.Printf("handler panic recovered: event=%s handler=%s panic=%v", t.ev.Id, t.h.name, r) + } + b.wg.Done() + }() + // Execute handler + t.h.fn(t.ev) + // If it was a once-handler, unregister it after execution. + if t.h.once { + _ = b.Unsubscribe(t.ev.Id, t.h.name) + } + }() + } + } +} + +func (b *bus) Start() error { + if b.started.Swap(true) { + return nil + } + // Flush pending + b.mu.Lock() + pending := b.pending + b.pending = nil + b.mu.Unlock() + + for _, ev := range pending { + err := b.PublishEvent(ev) + if err != nil { + b.log.Printf("failed to publish event: %v", err) + } + } + return nil +} + +func (b *bus) Wait() error { + // Wait for all in-flight tasks (that were queued before this call) to finish. + // If Stop() has been called, Wait returns after workers exit. + done := make(chan struct{}) + go func() { + b.wg.Wait() + close(done) + }() + select { + case <-done: + return nil + case <-b.drainedCh: + // Stopped + <-done + return nil + } +} + +func (b *bus) Stop() error { + b.stopOnce.Do(func() { + b.stopping.Store(true) + close(b.stopCh) // signal workers to stop immediately + close(b.drainedCh) // allow Wait() to proceed + }) + return nil +} + +func (b *bus) Subscribe(channel string, eventId, handlerName string, fn HandlerFunc) error { + if eventId == "" || handlerName == "" || fn == nil { + return errors.New("invalid Subscribe args") + } + b.mu.Lock() + defer b.mu.Unlock() + m := b.handlers[eventId] + if m == nil { + m = make(map[string]handlerRec) + b.handlers[eventId] = m + } + m[handlerName] = handlerRec{name: handlerName, fn: fn, channel: channel} + return nil +} + +func (b *bus) SubscribeAny(eventId, handlerName string, fn HandlerFunc) error { + if eventId == "" || handlerName == "" || fn == nil { + return errors.New("invalid Subscribe args") + } + b.mu.Lock() + defer b.mu.Unlock() + m := b.handlers[eventId] + if m == nil { + m = make(map[string]handlerRec) + b.handlers[eventId] = m + } + m[handlerName] = handlerRec{name: handlerName, fn: fn, channel: ""} + return nil +} + +func (b *bus) SubscribeOnce(eventId, handlerName string, fn HandlerFunc) error { + if eventId == "" || handlerName == "" || fn == nil { + return errors.New("invalid SubscribeOnce args") + } + b.mu.Lock() + defer b.mu.Unlock() + m := b.handlers[eventId] + if m == nil { + m = make(map[string]handlerRec) + b.handlers[eventId] = m + } + m[handlerName] = handlerRec{name: handlerName, fn: fn, once: true} + return nil +} + +func (b *bus) Unsubscribe(eventId, handlerName string) error { + if eventId == "" || handlerName == "" { + return errors.New("invalid Unsubscribe args") + } + b.mu.Lock() + defer b.mu.Unlock() + if m := b.handlers[eventId]; m != nil { + delete(m, handlerName) + if len(m) == 0 { + delete(b.handlers, eventId) + } + } + return nil +} + +func (b *bus) Publish(eventId string, data interface{}) error { + return b.PublishEvent(&Event{Id: eventId, Data: data}) +} + +func (b *bus) PublishEvent(ev *Event) error { + if ev == nil || ev.Id == "" { + return errors.New("invalid PublishEvent args") + } + // If stopping, drop events. + if b.stopping.Load() { + return errors.New("bus is stopping") + } + + // Rendezvous: if this looks like a reply (EchoId set) and someone is waiting, deliver. + if ev.EchoId != "" { + b.waitMu.Lock() + if ch, ok := b.echoWaiter[ev.Id+ev.EchoId]; ok { + select { + case ch <- ev: + default: + } + } + b.waitMu.Unlock() + } + + b.mu.RLock() + started := b.started.Load() + if !started { + // queue as pending (publish-before-start) + b.mu.RUnlock() + b.mu.Lock() + if !b.started.Load() { + b.pending = append(b.pending, cloneEvent(ev)) + b.mu.Unlock() + return nil + } + // started flipped while acquiring lock, fallthrough to publish now + b.mu.Unlock() + b.mu.RLock() + } + // Snapshot handlers for this event id. + m := b.handlers[ev.Id] + if len(m) == 0 { + b.mu.RUnlock() + return nil + } + + // Make a stable copy to avoid holding the lock during execution. + hs := make([]handlerRec, 0, len(m)) + for _, h := range m { + if ev.Channel != "" && h.channel != "" && ev.Channel != h.channel { + // channel not match pass + continue + } + hs = append(hs, h) + } + b.mu.RUnlock() + + // Enqueue each handler on its shard (worker) based on (eventId, handlerName). + for _, h := range hs { + idx := shardIndex(b.workerCount, ev.Id, h.name) + b.wg.Add(1) + select { + case b.queues[idx] <- task{ev: cloneEvent(ev), h: h}: + default: + // Backpressure: if shard queue is full, block (ensures ordering) but still bounded overall. + b.queues[idx] <- task{ev: cloneEvent(ev), h: h} + } + } + return nil +} + +// Call publishes a request and waits for a response event with the same EchoId. +// NOTE: Handlers should reply by publishing an Event with the SAME EchoId. +// Use Reply helper below. +func (b *bus) Call(eventId string, data interface{}, subEvtId string) (*Event, error) { + if eventId == "" { + return nil, errors.New("empty eventId") + } + echo := b.nextEchoId() + wait := make(chan *Event, 1) + + b.waitMu.Lock() + b.echoWaiter[subEvtId+echo] = wait + b.waitMu.Unlock() + defer func() { + b.waitMu.Lock() + delete(b.echoWaiter, subEvtId+echo) + b.waitMu.Unlock() + }() + + b.PublishEvent(&Event{Id: eventId, EchoId: echo, Data: data}) + + timeout := time.After(6 * time.Second) + + // No timeout specified in interface; block until reply or stop. + select { + case resp := <-wait: + return resp, nil + case <-timeout: + return nil, errors.New("call timeout") + case <-b.drainedCh: + return nil, errors.New("bus stopped") + } +} + +func (b *bus) nextEchoId() string { + x := b.idCtr.Add(1) + return fmt.Sprintf("echo-%d-%d", time.Now().UnixNano(), x) +} + +func shardIndex(n int, eventId, handlerName string) int { + h := fnv.New32a() + _, _ = h.Write([]byte(eventId)) + _, _ = h.Write([]byte{0}) + _, _ = h.Write([]byte(handlerName)) + return int(h.Sum32() % uint32(n)) +} + +func cloneEvent(e *Event) *Event { + if e == nil { + return nil + } + // shallow clone is fine; Data is user-owned + cp := *e + return &cp +} diff --git a/pkg/eventbus/bus_impl_test.go b/pkg/eventbus/bus_impl_test.go new file mode 100644 index 0000000..f654f3b --- /dev/null +++ b/pkg/eventbus/bus_impl_test.go @@ -0,0 +1,398 @@ +// generated by chatgpt +package eventbus + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// TestBasicLifecycle verifies the fundamental Start, Stop, and Wait operations. +func TestBasicLifecycle(t *testing.T) { + b := New(WithWorkerSize(2), WithQueueSize(10)) + + // Start should only work once. + err := b.Start() + require.NoError(t, err) + err = b.Start() + require.NoError(t, err) // Subsequent starts should be no-ops. + + // Stop should work. + err = b.Stop() + require.NoError(t, err) + + // Wait should not block after stop. + err = b.Wait() + require.NoError(t, err) +} + +// TestSubscribeAndPublish verifies the core functionality of publishing an event +// and having a subscriber receive it. +func TestSubscribeAndPublish(t *testing.T) { + b := New(WithWorkerSize(1), WithQueueSize(10)) + err := b.Start() + require.NoError(t, err) + defer b.Stop() + + var wg sync.WaitGroup + wg.Add(1) + + receivedData := new(atomic.Value) + + handler := func(event *Event) { + receivedData.Store(event.Data) + wg.Done() + } + + err = b.Subscribe("", "test-event", "test-handler", handler) + require.NoError(t, err) + + b.Publish("test-event", "hello world") + + wg.Wait() + require.Equal(t, "hello world", receivedData.Load()) +} + +// TestUnsubscribe ensures that a handler stops receiving events after unsubscribing. +func TestUnsubscribe(t *testing.T) { + b := New(WithWorkerSize(2), WithQueueSize(10)) + b.Start() + defer b.Stop() + + var callCount int32 + handler := func(event *Event) { + atomic.AddInt32(&callCount, 1) + } + + err := b.Subscribe("", "event-A", "handler-1", handler) + require.NoError(t, err) + + b.Publish("event-A", nil) + b.Wait() + require.Equal(t, int32(1), atomic.LoadInt32(&callCount)) + + // Unsubscribe + err = b.Unsubscribe("event-A", "handler-1") + require.NoError(t, err) + + // Publish again + b.Publish("event-A", nil) + b.Wait() // Give it a moment to ensure it's not processed + time.Sleep(50 * time.Millisecond) + require.Equal(t, int32(1), atomic.LoadInt32(&callCount), "Handler should not be called after unsubscribing") +} + +// TestSubscribeOnce verifies that a handler subscribed with SubscribeOnce +// is only called once and then automatically removed. +func TestSubscribeOnce(t *testing.T) { + b := New(WithWorkerSize(1), WithQueueSize(10)) + b.Start() + defer b.Stop() + + var callCount int32 + var wg sync.WaitGroup + wg.Add(1) + + handler := func(event *Event) { + atomic.AddInt32(&callCount, 1) + wg.Done() + } + + err := b.SubscribeOnce("event-once", "handler-once", handler) + require.NoError(t, err) + + // Publish twice + b.Publish("event-once", "data1") + wg.Wait() // Wait for the first event to be processed + + b.Publish("event-once", "data2") + b.Wait() + time.Sleep(50 * time.Millisecond) // Ensure no more events are processed + + require.Equal(t, int32(1), atomic.LoadInt32(&callCount)) +} + +// TestChannelSubscription validates that handlers correctly receive events based on channel matching. +func TestChannelSubscription(t *testing.T) { + b := New(WithWorkerSize(2), WithQueueSize(20)) + b.Start() + defer b.Stop() + + var receivedMu sync.Mutex + received := make(map[string]int) + + // Handler for a specific channel + err := b.Subscribe("ch1", "event-X", "handler-ch1", func(event *Event) { + receivedMu.Lock() + received["handler-ch1"]++ + receivedMu.Unlock() + }) + require.NoError(t, err) + + // Handler for another channel + err = b.Subscribe("ch2", "event-X", "handler-ch2", func(event *Event) { + receivedMu.Lock() + received["handler-ch2"]++ + receivedMu.Unlock() + }) + require.NoError(t, err) + + // Handler for any channel (broadcast) + err = b.SubscribeAny("event-X", "handler-any", func(event *Event) { + receivedMu.Lock() + received["handler-any"]++ + receivedMu.Unlock() + }) + require.NoError(t, err) + + // 1. Publish to ch1 + b.PublishEvent(&Event{Id: "event-X", Channel: "ch1"}) + b.Wait() + time.Sleep(50 * time.Millisecond) + receivedMu.Lock() + require.Equal(t, 1, received["handler-ch1"]) + require.Equal(t, 0, received["handler-ch2"]) + require.Equal(t, 1, received["handler-any"]) + receivedMu.Unlock() + + // 2. Publish to ch2 + b.PublishEvent(&Event{Id: "event-X", Channel: "ch2"}) + b.Wait() + time.Sleep(50 * time.Millisecond) + receivedMu.Lock() + require.Equal(t, 1, received["handler-ch1"]) + require.Equal(t, 1, received["handler-ch2"]) + require.Equal(t, 2, received["handler-any"]) + receivedMu.Unlock() + + // 3. Publish broadcast (empty channel) + b.PublishEvent(&Event{Id: "event-X", Channel: ""}) + b.Wait() + time.Sleep(50 * time.Millisecond) + receivedMu.Lock() + // All handlers should receive broadcast events + require.Equal(t, 2, received["handler-ch1"]) + require.Equal(t, 2, received["handler-ch2"]) + require.Equal(t, 3, received["handler-any"]) + receivedMu.Unlock() +} + +// TestPublishBeforeStart ensures that events published before the bus starts are queued +// and processed after Start() is called. +func TestPublishBeforeStart(t *testing.T) { + b := New(WithWorkerSize(1), WithQueueSize(10)) + + var receivedCount int32 + var wg sync.WaitGroup + wg.Add(2) + + handler := func(event *Event) { + atomic.AddInt32(&receivedCount, 1) + wg.Done() + } + + err := b.Subscribe("", "pending-event", "handler", handler) + require.NoError(t, err) + + // Publish before start + b.Publish("pending-event", "data1") + b.Publish("pending-event", "data2") + + // Handler should not have been called yet + require.Equal(t, int32(0), atomic.LoadInt32(&receivedCount)) + + // Now start the bus + b.Start() + defer b.Stop() + + wg.Wait() + require.Equal(t, int32(2), atomic.LoadInt32(&receivedCount)) +} + +// TestCall validates the request-response pattern using the Call method. +func TestCall(t *testing.T) { + b := New(WithWorkerSize(2), WithQueueSize(10)) + b.Start() + defer b.Stop() + + // Responder handler + responder := func(event *Event) { + require.NotEmpty(t, event.EchoId) + // Respond with a different event ID, but the same EchoId + b.PublishEvent(&Event{ + Id: "response-event", + EchoId: event.EchoId, + Data: fmt.Sprintf("response to %v", event.Data), + }) + } + + err := b.Subscribe("", "request-event", "responder-handler", responder) + require.NoError(t, err) + + // Make the call + resp, err := b.Call("request-event", "my-data", "response-event") + + // Verify response + require.NoError(t, err) + require.NotNil(t, resp) + require.Equal(t, "response-event", resp.Id) + require.Equal(t, "response to my-data", resp.Data) +} + +// TestCall_StopDuringWait checks that Call returns an error if the bus is stopped while waiting. +func TestCall_StopDuringWait(t *testing.T) { + b := New(WithWorkerSize(1), WithQueueSize(10)) + b.Start() + + var callErr error + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + // This call will never get a response + _, callErr = b.Call("no-reply-event", nil, "no-reply-response") + }() + + // Give the goroutine time to start waiting + time.Sleep(100 * time.Millisecond) + b.Stop() // Stop the bus + wg.Wait() + + require.Error(t, callErr) + require.Contains(t, callErr.Error(), "bus stopped") +} + +// TestPanicRecovery ensures that a panicking handler does not crash the worker. +func TestPanicRecovery(t *testing.T) { + b := New(WithWorkerSize(1), WithQueueSize(10)) + b.Start() + defer b.Stop() + + var wg sync.WaitGroup + wg.Add(2) // One for the panic, one for the healthy one + + panicHandler := func(event *Event) { + defer wg.Done() + if event.Data == "panic" { + panic("handler intended panic") + } + } + + healthyHandler := func(event *Event) { + defer wg.Done() + // This should still run + } + + err := b.Subscribe("", "panic-event", "panic-handler", panicHandler) + require.NoError(t, err) + err = b.Subscribe("", "panic-event", "healthy-handler", healthyHandler) + require.NoError(t, err) + + // This should not crash the test + b.Publish("panic-event", "panic") + + wg.Wait() // Will complete if both handlers finish (one by panicking, one normally) + require.True(t, true, "Test completed, indicating panic was recovered") +} + +// TestConcurrency runs many operations in parallel to check for race conditions. +func TestConcurrency(t *testing.T) { + workerCount := 4 + queueSize := 50 + b := New(WithWorkerSize(workerCount), WithQueueSize(queueSize)) + b.Start() + defer b.Stop() + + numGoroutines := 50 + eventsPerGoRoutine := 100 + var totalEventsPublished int32 + var totalEventsReceived int32 + var wg sync.WaitGroup + + // Subscriber goroutines + for i := 0; i < numGoroutines; i++ { + eventId := fmt.Sprintf("concurrent-event-%d", i%10) // 10 different events + handlerId := fmt.Sprintf("handler-%d", i) + wg.Add(1) + go func() { + defer wg.Done() + handler := func(e *Event) { + atomic.AddInt32(&totalEventsReceived, 1) + } + err := b.Subscribe("", eventId, handlerId, handler) + require.NoError(t, err) + + time.Sleep(10 * time.Millisecond) + + err = b.Unsubscribe(eventId, handlerId) + require.NoError(t, err) + }() + } + + // Setup a persistent handler to count received events + persistentHandler := func(e *Event) { + atomic.AddInt32(&totalEventsReceived, 1) + } + + for i := 0; i < 20; i++ { + eventId := fmt.Sprintf("event-for-%d", i) + err := b.Subscribe("", eventId, "persistent-handler", persistentHandler) + require.NoError(t, err) + } + + // Publisher goroutines + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(gId int) { + defer wg.Done() + for j := 0; j < eventsPerGoRoutine; j++ { + eventId := fmt.Sprintf("event-for-%d", (gId+j)%20) + b.Publish(eventId, gId) + atomic.AddInt32(&totalEventsPublished, 1) + } + }(i) + } + + wg.Wait() + b.Wait() // Wait for all published events to be processed + + fmt.Printf("Published: %d, Received: %d\n", totalEventsPublished, totalEventsReceived) + // We check that the number of received events matches published events for persistent handlers + require.Equal(t, atomic.LoadInt32(&totalEventsPublished), atomic.LoadInt32(&totalEventsReceived)) +} + +// TestInvalidArguments checks that API methods return errors on invalid input. +func TestInvalidArguments(t *testing.T) { + b := New(WithWorkerSize(1), WithQueueSize(1)) + + // Subscribe + err := b.Subscribe("", "", "name", func(e *Event) {}) + require.Error(t, err, "Subscribe should error on empty eventId") + err = b.Subscribe("", "id", "", func(e *Event) {}) + require.Error(t, err, "Subscribe should error on empty handlerName") + err = b.Subscribe("", "id", "name", nil) + require.Error(t, err, "Subscribe should error on nil handler func") + + // SubscribeAny + err = b.SubscribeAny("", "name", func(e *Event) {}) + require.Error(t, err, "SubscribeAny should error on empty eventId") + + // SubscribeOnce + err = b.SubscribeOnce("", "name", func(e *Event) {}) + require.Error(t, err, "SubscribeOnce should error on empty eventId") + + // Unsubscribe + err = b.Unsubscribe("", "name") + require.Error(t, err, "Unsubscribe should error on empty eventId") + err = b.Unsubscribe("id", "") + require.Error(t, err, "Unsubscribe should error on empty handlerName") + + // Call + _, err = b.Call("", nil, "subID") + require.Error(t, err, "Call should error on empty eventId") +} diff --git a/pkg/eventbus/events.go b/pkg/eventbus/events.go new file mode 100644 index 0000000..fadcb89 --- /dev/null +++ b/pkg/eventbus/events.go @@ -0,0 +1,65 @@ +package eventbus + +import ( + "encoding/json" + "errors" + "reflect" +) + +var DefaultMapper = NewEventsMapper() + +func UnmarshalEvent(data []byte) (*Event, error) { + return DefaultMapper.UnmarshalEvent(data) +} + +func UnmarshalEventData(eventId string, data []byte) (any, error) { + return DefaultMapper.UnmarshalEventData(eventId, data) +} + +type EventsMapper struct { + Mapping map[string]any +} + +func NewEventsMapper() *EventsMapper { + return &EventsMapper{ + Mapping: make(map[string]any), + } +} + +type untypedEvent struct { + Id string + Channel string + EchoId string + Data json.RawMessage +} + +func (m *EventsMapper) UnmarshalEvent(data []byte) (*Event, error) { + var val untypedEvent + err := json.Unmarshal(data, &val) + if err != nil { + return nil, errors.New("failed to unmarshal event: " + err.Error()) + } + actualEventData, err := m.UnmarshalEventData(val.Id, val.Data) + if err != nil { + return nil, errors.New("failed to unmarshal event data: " + err.Error()) + } + return &Event{ + Id: val.Id, + Channel: val.Channel, + EchoId: val.EchoId, + Data: actualEventData, + }, nil +} + +func (m *EventsMapper) UnmarshalEventData(eventId string, data []byte) (any, error) { + val, ok := m.Mapping[eventId] + if !ok { + return nil, errors.New("event id not found") + } + newVal := reflect.New(reflect.TypeOf(val)) + err := json.Unmarshal(data, newVal.Interface()) + if err != nil { + return nil, err + } + return newVal.Elem().Interface(), nil +} diff --git a/pkg/eventbus/go.mod b/pkg/eventbus/go.mod new file mode 100644 index 0000000..1476162 --- /dev/null +++ b/pkg/eventbus/go.mod @@ -0,0 +1,11 @@ +module eventbus + +go 1.24.4 + +require github.com/stretchr/testify v1.11.1 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/pkg/eventbus/logger.go b/pkg/eventbus/logger.go new file mode 100644 index 0000000..0157dcb --- /dev/null +++ b/pkg/eventbus/logger.go @@ -0,0 +1,16 @@ +package eventbus + +import "log" + +type Logger interface { + Printf(string, ...interface{}) +} + +type loggerImpl struct{} + +func (l loggerImpl) Printf(s string, i ...interface{}) { + log.Printf(s, i...) +} + +// Log replace with your own logger if needed +var Log Logger = &loggerImpl{} diff --git a/pkg/eventbus/options.go b/pkg/eventbus/options.go new file mode 100644 index 0000000..834d7a8 --- /dev/null +++ b/pkg/eventbus/options.go @@ -0,0 +1,29 @@ +package eventbus + +type options struct { + log Logger + workerSize int + queueSize int +} + +type Option func(*options) + +func WithLogger(logger Logger) Option { + return func(o *options) { o.log = logger } +} + +func WithWorkerSize(workerSize int) Option { + return func(o *options) { + if workerSize >= 1 { + o.workerSize = workerSize + } + } +} + +func WithQueueSize(queueSize int) Option { + return func(o *options) { + if queueSize >= 1 { + o.queueSize = queueSize + } + } +} diff --git a/pkg/eventbus/readme.md b/pkg/eventbus/readme.md new file mode 100644 index 0000000..8080576 --- /dev/null +++ b/pkg/eventbus/readme.md @@ -0,0 +1,10 @@ +# eventbus + +rewrite version of original event manager in AynaLivePlayer + +inspired by [asaskevich/EventBus](https://github.com/asaskevich/EventBus) + + +# todo + +- custom marshaller \ No newline at end of file diff --git a/pkg/miaosic b/pkg/miaosic index dd6ffb0..f834cca 160000 --- a/pkg/miaosic +++ b/pkg/miaosic @@ -1 +1 @@ -Subproject commit dd6ffb054612bf4ded1f68bb785eefd662da5d91 +Subproject commit f834cca69822df12ff851cc20ee4c8e1207916ac