3 Commits

Author SHA1 Message Date
Aynakeya
eb564402ce Merge pull request #54 from AynaLivePlayer/dev
merge dev
2025-09-14 00:23:34 +08:00
aynakeya
f6f306edc3 update miaosic dependency 2025-09-14 00:22:28 +08:00
aynakeya
a26b58b083 new event bus 2025-09-02 14:31:17 +08:00
12 changed files with 950 additions and 24 deletions

View File

@@ -6,28 +6,6 @@ Provider By Aynakeya
QQ group: 621035845
## Disclaimer
All APIs used in this project are **publicly available** on the internet and not obtained through illegal means such as
reverse engineering.
The use of this project may involve access to copyrighted content. This project does **not** own or claim any rights to
such content. **To avoid potential infringement**, all users are **required to delete any copyrighted data obtained
through this project within 24 hours.**
Any direct, indirect, special, incidental, or consequential damages (including but not limited to loss of goodwill, work
stoppage, computer failure or malfunction, or any and all other commercial damages or losses) that arise from the use or
inability to use this project are **solely the responsibility of the user**.
This project is completely free and open-source, published on GitHub for global users for **technical learning and
research purposes only**. This project does **not** guarantee compliance with local laws or regulations in all
jurisdictions.
**Using this project in violation of local laws is strictly prohibited.** Any legal consequences arising from
intentional or unintentional violations are the user's responsibility. The project maintainers accept **no liability**
for such outcomes.
## build

View File

@@ -10,7 +10,7 @@ type LiveRoom struct {
LiveRoom liveroom.LiveRoom `json:"live_room"`
Config LiveRoomConfig `json:"config"`
Title string `json:"title"`
Status bool `json:"-"`
Status bool `json:"status"`
}
func (r *LiveRoom) DisplayName() string {

9
pkg/eventbus/bridge.go Normal file
View File

@@ -0,0 +1,9 @@
package eventbus
// BusBridge is a minimal interface for a websocket-like JSON connection.
// Implemented by e.g. *websocket.Conn from gorilla via wrappers:
type BusBridge interface {
ReadJSON(v any) error
WriteJSON(v any) error
Close() error
}

60
pkg/eventbus/bus.go Normal file
View File

@@ -0,0 +1,60 @@
package eventbus
type Event struct {
Id string
// Channel if channel is empty, then event is broadcast
Channel string
// EchoId is used for callback, if echo is not empty
// the caller is expecting a callback
EchoId string
// Data any data struct
Data interface{}
}
// HandlerFunc event handler, should be non-blocking
type HandlerFunc func(event *Event)
// Subscriber is client to the bus
type Subscriber interface {
// Subscribe will run this handler asynchronous when an event received;
// event will still come sequentially for each handler. which means before previous
// event has finished, the same handler should not be called.
// if channel is not empty, the handler will not receive event from other channel, however,
// broadcast event (channel is empty) will still be passed to the handler
Subscribe(channel string, eventId string, handlerName string, fn HandlerFunc) error
// SubscribeAny is Subscribe with empty channel. this function will subscribe to event from any channel
SubscribeAny(eventId string, handlerName string, fn HandlerFunc) error
// SubscribeOnce will run handler once, and delete handler internally
SubscribeOnce(eventId string, handlerName string, fn HandlerFunc) error
// Unsubscribe just remove handler for the bus
Unsubscribe(eventId string, handlerName string) error
}
type Publisher interface {
// Publish basically a wrapper to PublishEvent
Publish(eventId string, data interface{}) error
// PublishEvent publish a event
PublishEvent(event *Event) error
}
// Caller is special usage of a Publisher
type Caller interface {
Call(pubEvtId string, data interface{}, subEvtId string) (*Event, error)
}
type Controller interface {
// Start will start to push events to subscribers,
// Publisher should be able to publish events before bus started
Start() error
// Wait will wait all event to be executed
Wait() error
// Stop will stop controller immediately
Stop() error
}
type Bus interface {
Controller
Publisher
Subscriber
Caller
}

350
pkg/eventbus/bus_impl.go Normal file
View File

@@ -0,0 +1,350 @@
package eventbus
import (
"errors"
"fmt"
"hash/fnv"
"sync"
"sync/atomic"
"time"
)
type handlerRec struct {
name string
fn HandlerFunc
once bool // if true, auto-unregister after first successful run
channel string
}
type task struct {
ev *Event
h handlerRec
}
// bus implements Bus.
type bus struct {
// configuration
workerCount int
queueSize int
// workers
queues []chan task
wg sync.WaitGroup
// lifecycle
started atomic.Bool
stopping atomic.Bool
stopOnce sync.Once
stopCh chan struct{}
drainedCh chan struct{}
// routing & bookkeeping
mu sync.RWMutex
handlers map[string]map[string]handlerRec // eventId -> handlerName -> handlerRec
pending []*Event // events published before Start()
// rendezvous for Call/EchoId
waitMu sync.Mutex
echoWaiter map[string]chan *Event
// simple id source for EchoId if caller doesn't provide
idCtr atomic.Uint64
// logger
log Logger
}
// New creates a new Bus.
// workerCount >= 1, queueSize >= 1.
func New(opts ...Option) Bus {
option := options{
log: Log,
workerSize: 10,
queueSize: 100,
}
for _, opt := range opts {
opt(&option)
}
b := &bus{
workerCount: option.workerSize,
queueSize: option.queueSize,
queues: make([]chan task, option.workerSize),
stopCh: make(chan struct{}),
drainedCh: make(chan struct{}),
handlers: make(map[string]map[string]handlerRec),
pending: make([]*Event, 0, 16),
echoWaiter: make(map[string]chan *Event),
log: option.log,
}
for i := 0; i < option.workerSize; i++ {
q := make(chan task, option.queueSize)
b.queues[i] = q
go b.workerLoop(q)
}
return b
}
func (b *bus) workerLoop(q chan task) {
for {
select {
case <-b.stopCh:
// Drain quickly without executing tasks (immediate stop).
for {
select {
case <-q:
// drop
default:
return
}
}
case t := <-q:
func() {
defer func() {
if r := recover(); r != nil {
b.log.Printf("handler panic recovered: event=%s handler=%s panic=%v", t.ev.Id, t.h.name, r)
}
b.wg.Done()
}()
// Execute handler
t.h.fn(t.ev)
// If it was a once-handler, unregister it after execution.
if t.h.once {
_ = b.Unsubscribe(t.ev.Id, t.h.name)
}
}()
}
}
}
func (b *bus) Start() error {
if b.started.Swap(true) {
return nil
}
// Flush pending
b.mu.Lock()
pending := b.pending
b.pending = nil
b.mu.Unlock()
for _, ev := range pending {
err := b.PublishEvent(ev)
if err != nil {
b.log.Printf("failed to publish event: %v", err)
}
}
return nil
}
func (b *bus) Wait() error {
// Wait for all in-flight tasks (that were queued before this call) to finish.
// If Stop() has been called, Wait returns after workers exit.
done := make(chan struct{})
go func() {
b.wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-b.drainedCh:
// Stopped
<-done
return nil
}
}
func (b *bus) Stop() error {
b.stopOnce.Do(func() {
b.stopping.Store(true)
close(b.stopCh) // signal workers to stop immediately
close(b.drainedCh) // allow Wait() to proceed
})
return nil
}
func (b *bus) Subscribe(channel string, eventId, handlerName string, fn HandlerFunc) error {
if eventId == "" || handlerName == "" || fn == nil {
return errors.New("invalid Subscribe args")
}
b.mu.Lock()
defer b.mu.Unlock()
m := b.handlers[eventId]
if m == nil {
m = make(map[string]handlerRec)
b.handlers[eventId] = m
}
m[handlerName] = handlerRec{name: handlerName, fn: fn, channel: channel}
return nil
}
func (b *bus) SubscribeAny(eventId, handlerName string, fn HandlerFunc) error {
if eventId == "" || handlerName == "" || fn == nil {
return errors.New("invalid Subscribe args")
}
b.mu.Lock()
defer b.mu.Unlock()
m := b.handlers[eventId]
if m == nil {
m = make(map[string]handlerRec)
b.handlers[eventId] = m
}
m[handlerName] = handlerRec{name: handlerName, fn: fn, channel: ""}
return nil
}
func (b *bus) SubscribeOnce(eventId, handlerName string, fn HandlerFunc) error {
if eventId == "" || handlerName == "" || fn == nil {
return errors.New("invalid SubscribeOnce args")
}
b.mu.Lock()
defer b.mu.Unlock()
m := b.handlers[eventId]
if m == nil {
m = make(map[string]handlerRec)
b.handlers[eventId] = m
}
m[handlerName] = handlerRec{name: handlerName, fn: fn, once: true}
return nil
}
func (b *bus) Unsubscribe(eventId, handlerName string) error {
if eventId == "" || handlerName == "" {
return errors.New("invalid Unsubscribe args")
}
b.mu.Lock()
defer b.mu.Unlock()
if m := b.handlers[eventId]; m != nil {
delete(m, handlerName)
if len(m) == 0 {
delete(b.handlers, eventId)
}
}
return nil
}
func (b *bus) Publish(eventId string, data interface{}) error {
return b.PublishEvent(&Event{Id: eventId, Data: data})
}
func (b *bus) PublishEvent(ev *Event) error {
if ev == nil || ev.Id == "" {
return errors.New("invalid PublishEvent args")
}
// If stopping, drop events.
if b.stopping.Load() {
return errors.New("bus is stopping")
}
// Rendezvous: if this looks like a reply (EchoId set) and someone is waiting, deliver.
if ev.EchoId != "" {
b.waitMu.Lock()
if ch, ok := b.echoWaiter[ev.Id+ev.EchoId]; ok {
select {
case ch <- ev:
default:
}
}
b.waitMu.Unlock()
}
b.mu.RLock()
started := b.started.Load()
if !started {
// queue as pending (publish-before-start)
b.mu.RUnlock()
b.mu.Lock()
if !b.started.Load() {
b.pending = append(b.pending, cloneEvent(ev))
b.mu.Unlock()
return nil
}
// started flipped while acquiring lock, fallthrough to publish now
b.mu.Unlock()
b.mu.RLock()
}
// Snapshot handlers for this event id.
m := b.handlers[ev.Id]
if len(m) == 0 {
b.mu.RUnlock()
return nil
}
// Make a stable copy to avoid holding the lock during execution.
hs := make([]handlerRec, 0, len(m))
for _, h := range m {
if ev.Channel != "" && h.channel != "" && ev.Channel != h.channel {
// channel not match pass
continue
}
hs = append(hs, h)
}
b.mu.RUnlock()
// Enqueue each handler on its shard (worker) based on (eventId, handlerName).
for _, h := range hs {
idx := shardIndex(b.workerCount, ev.Id, h.name)
b.wg.Add(1)
select {
case b.queues[idx] <- task{ev: cloneEvent(ev), h: h}:
default:
// Backpressure: if shard queue is full, block (ensures ordering) but still bounded overall.
b.queues[idx] <- task{ev: cloneEvent(ev), h: h}
}
}
return nil
}
// Call publishes a request and waits for a response event with the same EchoId.
// NOTE: Handlers should reply by publishing an Event with the SAME EchoId.
// Use Reply helper below.
func (b *bus) Call(eventId string, data interface{}, subEvtId string) (*Event, error) {
if eventId == "" {
return nil, errors.New("empty eventId")
}
echo := b.nextEchoId()
wait := make(chan *Event, 1)
b.waitMu.Lock()
b.echoWaiter[subEvtId+echo] = wait
b.waitMu.Unlock()
defer func() {
b.waitMu.Lock()
delete(b.echoWaiter, subEvtId+echo)
b.waitMu.Unlock()
}()
b.PublishEvent(&Event{Id: eventId, EchoId: echo, Data: data})
timeout := time.After(6 * time.Second)
// No timeout specified in interface; block until reply or stop.
select {
case resp := <-wait:
return resp, nil
case <-timeout:
return nil, errors.New("call timeout")
case <-b.drainedCh:
return nil, errors.New("bus stopped")
}
}
func (b *bus) nextEchoId() string {
x := b.idCtr.Add(1)
return fmt.Sprintf("echo-%d-%d", time.Now().UnixNano(), x)
}
func shardIndex(n int, eventId, handlerName string) int {
h := fnv.New32a()
_, _ = h.Write([]byte(eventId))
_, _ = h.Write([]byte{0})
_, _ = h.Write([]byte(handlerName))
return int(h.Sum32() % uint32(n))
}
func cloneEvent(e *Event) *Event {
if e == nil {
return nil
}
// shallow clone is fine; Data is user-owned
cp := *e
return &cp
}

View File

@@ -0,0 +1,398 @@
// generated by chatgpt
package eventbus
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
)
// TestBasicLifecycle verifies the fundamental Start, Stop, and Wait operations.
func TestBasicLifecycle(t *testing.T) {
b := New(WithWorkerSize(2), WithQueueSize(10))
// Start should only work once.
err := b.Start()
require.NoError(t, err)
err = b.Start()
require.NoError(t, err) // Subsequent starts should be no-ops.
// Stop should work.
err = b.Stop()
require.NoError(t, err)
// Wait should not block after stop.
err = b.Wait()
require.NoError(t, err)
}
// TestSubscribeAndPublish verifies the core functionality of publishing an event
// and having a subscriber receive it.
func TestSubscribeAndPublish(t *testing.T) {
b := New(WithWorkerSize(1), WithQueueSize(10))
err := b.Start()
require.NoError(t, err)
defer b.Stop()
var wg sync.WaitGroup
wg.Add(1)
receivedData := new(atomic.Value)
handler := func(event *Event) {
receivedData.Store(event.Data)
wg.Done()
}
err = b.Subscribe("", "test-event", "test-handler", handler)
require.NoError(t, err)
b.Publish("test-event", "hello world")
wg.Wait()
require.Equal(t, "hello world", receivedData.Load())
}
// TestUnsubscribe ensures that a handler stops receiving events after unsubscribing.
func TestUnsubscribe(t *testing.T) {
b := New(WithWorkerSize(2), WithQueueSize(10))
b.Start()
defer b.Stop()
var callCount int32
handler := func(event *Event) {
atomic.AddInt32(&callCount, 1)
}
err := b.Subscribe("", "event-A", "handler-1", handler)
require.NoError(t, err)
b.Publish("event-A", nil)
b.Wait()
require.Equal(t, int32(1), atomic.LoadInt32(&callCount))
// Unsubscribe
err = b.Unsubscribe("event-A", "handler-1")
require.NoError(t, err)
// Publish again
b.Publish("event-A", nil)
b.Wait() // Give it a moment to ensure it's not processed
time.Sleep(50 * time.Millisecond)
require.Equal(t, int32(1), atomic.LoadInt32(&callCount), "Handler should not be called after unsubscribing")
}
// TestSubscribeOnce verifies that a handler subscribed with SubscribeOnce
// is only called once and then automatically removed.
func TestSubscribeOnce(t *testing.T) {
b := New(WithWorkerSize(1), WithQueueSize(10))
b.Start()
defer b.Stop()
var callCount int32
var wg sync.WaitGroup
wg.Add(1)
handler := func(event *Event) {
atomic.AddInt32(&callCount, 1)
wg.Done()
}
err := b.SubscribeOnce("event-once", "handler-once", handler)
require.NoError(t, err)
// Publish twice
b.Publish("event-once", "data1")
wg.Wait() // Wait for the first event to be processed
b.Publish("event-once", "data2")
b.Wait()
time.Sleep(50 * time.Millisecond) // Ensure no more events are processed
require.Equal(t, int32(1), atomic.LoadInt32(&callCount))
}
// TestChannelSubscription validates that handlers correctly receive events based on channel matching.
func TestChannelSubscription(t *testing.T) {
b := New(WithWorkerSize(2), WithQueueSize(20))
b.Start()
defer b.Stop()
var receivedMu sync.Mutex
received := make(map[string]int)
// Handler for a specific channel
err := b.Subscribe("ch1", "event-X", "handler-ch1", func(event *Event) {
receivedMu.Lock()
received["handler-ch1"]++
receivedMu.Unlock()
})
require.NoError(t, err)
// Handler for another channel
err = b.Subscribe("ch2", "event-X", "handler-ch2", func(event *Event) {
receivedMu.Lock()
received["handler-ch2"]++
receivedMu.Unlock()
})
require.NoError(t, err)
// Handler for any channel (broadcast)
err = b.SubscribeAny("event-X", "handler-any", func(event *Event) {
receivedMu.Lock()
received["handler-any"]++
receivedMu.Unlock()
})
require.NoError(t, err)
// 1. Publish to ch1
b.PublishEvent(&Event{Id: "event-X", Channel: "ch1"})
b.Wait()
time.Sleep(50 * time.Millisecond)
receivedMu.Lock()
require.Equal(t, 1, received["handler-ch1"])
require.Equal(t, 0, received["handler-ch2"])
require.Equal(t, 1, received["handler-any"])
receivedMu.Unlock()
// 2. Publish to ch2
b.PublishEvent(&Event{Id: "event-X", Channel: "ch2"})
b.Wait()
time.Sleep(50 * time.Millisecond)
receivedMu.Lock()
require.Equal(t, 1, received["handler-ch1"])
require.Equal(t, 1, received["handler-ch2"])
require.Equal(t, 2, received["handler-any"])
receivedMu.Unlock()
// 3. Publish broadcast (empty channel)
b.PublishEvent(&Event{Id: "event-X", Channel: ""})
b.Wait()
time.Sleep(50 * time.Millisecond)
receivedMu.Lock()
// All handlers should receive broadcast events
require.Equal(t, 2, received["handler-ch1"])
require.Equal(t, 2, received["handler-ch2"])
require.Equal(t, 3, received["handler-any"])
receivedMu.Unlock()
}
// TestPublishBeforeStart ensures that events published before the bus starts are queued
// and processed after Start() is called.
func TestPublishBeforeStart(t *testing.T) {
b := New(WithWorkerSize(1), WithQueueSize(10))
var receivedCount int32
var wg sync.WaitGroup
wg.Add(2)
handler := func(event *Event) {
atomic.AddInt32(&receivedCount, 1)
wg.Done()
}
err := b.Subscribe("", "pending-event", "handler", handler)
require.NoError(t, err)
// Publish before start
b.Publish("pending-event", "data1")
b.Publish("pending-event", "data2")
// Handler should not have been called yet
require.Equal(t, int32(0), atomic.LoadInt32(&receivedCount))
// Now start the bus
b.Start()
defer b.Stop()
wg.Wait()
require.Equal(t, int32(2), atomic.LoadInt32(&receivedCount))
}
// TestCall validates the request-response pattern using the Call method.
func TestCall(t *testing.T) {
b := New(WithWorkerSize(2), WithQueueSize(10))
b.Start()
defer b.Stop()
// Responder handler
responder := func(event *Event) {
require.NotEmpty(t, event.EchoId)
// Respond with a different event ID, but the same EchoId
b.PublishEvent(&Event{
Id: "response-event",
EchoId: event.EchoId,
Data: fmt.Sprintf("response to %v", event.Data),
})
}
err := b.Subscribe("", "request-event", "responder-handler", responder)
require.NoError(t, err)
// Make the call
resp, err := b.Call("request-event", "my-data", "response-event")
// Verify response
require.NoError(t, err)
require.NotNil(t, resp)
require.Equal(t, "response-event", resp.Id)
require.Equal(t, "response to my-data", resp.Data)
}
// TestCall_StopDuringWait checks that Call returns an error if the bus is stopped while waiting.
func TestCall_StopDuringWait(t *testing.T) {
b := New(WithWorkerSize(1), WithQueueSize(10))
b.Start()
var callErr error
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// This call will never get a response
_, callErr = b.Call("no-reply-event", nil, "no-reply-response")
}()
// Give the goroutine time to start waiting
time.Sleep(100 * time.Millisecond)
b.Stop() // Stop the bus
wg.Wait()
require.Error(t, callErr)
require.Contains(t, callErr.Error(), "bus stopped")
}
// TestPanicRecovery ensures that a panicking handler does not crash the worker.
func TestPanicRecovery(t *testing.T) {
b := New(WithWorkerSize(1), WithQueueSize(10))
b.Start()
defer b.Stop()
var wg sync.WaitGroup
wg.Add(2) // One for the panic, one for the healthy one
panicHandler := func(event *Event) {
defer wg.Done()
if event.Data == "panic" {
panic("handler intended panic")
}
}
healthyHandler := func(event *Event) {
defer wg.Done()
// This should still run
}
err := b.Subscribe("", "panic-event", "panic-handler", panicHandler)
require.NoError(t, err)
err = b.Subscribe("", "panic-event", "healthy-handler", healthyHandler)
require.NoError(t, err)
// This should not crash the test
b.Publish("panic-event", "panic")
wg.Wait() // Will complete if both handlers finish (one by panicking, one normally)
require.True(t, true, "Test completed, indicating panic was recovered")
}
// TestConcurrency runs many operations in parallel to check for race conditions.
func TestConcurrency(t *testing.T) {
workerCount := 4
queueSize := 50
b := New(WithWorkerSize(workerCount), WithQueueSize(queueSize))
b.Start()
defer b.Stop()
numGoroutines := 50
eventsPerGoRoutine := 100
var totalEventsPublished int32
var totalEventsReceived int32
var wg sync.WaitGroup
// Subscriber goroutines
for i := 0; i < numGoroutines; i++ {
eventId := fmt.Sprintf("concurrent-event-%d", i%10) // 10 different events
handlerId := fmt.Sprintf("handler-%d", i)
wg.Add(1)
go func() {
defer wg.Done()
handler := func(e *Event) {
atomic.AddInt32(&totalEventsReceived, 1)
}
err := b.Subscribe("", eventId, handlerId, handler)
require.NoError(t, err)
time.Sleep(10 * time.Millisecond)
err = b.Unsubscribe(eventId, handlerId)
require.NoError(t, err)
}()
}
// Setup a persistent handler to count received events
persistentHandler := func(e *Event) {
atomic.AddInt32(&totalEventsReceived, 1)
}
for i := 0; i < 20; i++ {
eventId := fmt.Sprintf("event-for-%d", i)
err := b.Subscribe("", eventId, "persistent-handler", persistentHandler)
require.NoError(t, err)
}
// Publisher goroutines
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(gId int) {
defer wg.Done()
for j := 0; j < eventsPerGoRoutine; j++ {
eventId := fmt.Sprintf("event-for-%d", (gId+j)%20)
b.Publish(eventId, gId)
atomic.AddInt32(&totalEventsPublished, 1)
}
}(i)
}
wg.Wait()
b.Wait() // Wait for all published events to be processed
fmt.Printf("Published: %d, Received: %d\n", totalEventsPublished, totalEventsReceived)
// We check that the number of received events matches published events for persistent handlers
require.Equal(t, atomic.LoadInt32(&totalEventsPublished), atomic.LoadInt32(&totalEventsReceived))
}
// TestInvalidArguments checks that API methods return errors on invalid input.
func TestInvalidArguments(t *testing.T) {
b := New(WithWorkerSize(1), WithQueueSize(1))
// Subscribe
err := b.Subscribe("", "", "name", func(e *Event) {})
require.Error(t, err, "Subscribe should error on empty eventId")
err = b.Subscribe("", "id", "", func(e *Event) {})
require.Error(t, err, "Subscribe should error on empty handlerName")
err = b.Subscribe("", "id", "name", nil)
require.Error(t, err, "Subscribe should error on nil handler func")
// SubscribeAny
err = b.SubscribeAny("", "name", func(e *Event) {})
require.Error(t, err, "SubscribeAny should error on empty eventId")
// SubscribeOnce
err = b.SubscribeOnce("", "name", func(e *Event) {})
require.Error(t, err, "SubscribeOnce should error on empty eventId")
// Unsubscribe
err = b.Unsubscribe("", "name")
require.Error(t, err, "Unsubscribe should error on empty eventId")
err = b.Unsubscribe("id", "")
require.Error(t, err, "Unsubscribe should error on empty handlerName")
// Call
_, err = b.Call("", nil, "subID")
require.Error(t, err, "Call should error on empty eventId")
}

65
pkg/eventbus/events.go Normal file
View File

@@ -0,0 +1,65 @@
package eventbus
import (
"encoding/json"
"errors"
"reflect"
)
var DefaultMapper = NewEventsMapper()
func UnmarshalEvent(data []byte) (*Event, error) {
return DefaultMapper.UnmarshalEvent(data)
}
func UnmarshalEventData(eventId string, data []byte) (any, error) {
return DefaultMapper.UnmarshalEventData(eventId, data)
}
type EventsMapper struct {
Mapping map[string]any
}
func NewEventsMapper() *EventsMapper {
return &EventsMapper{
Mapping: make(map[string]any),
}
}
type untypedEvent struct {
Id string
Channel string
EchoId string
Data json.RawMessage
}
func (m *EventsMapper) UnmarshalEvent(data []byte) (*Event, error) {
var val untypedEvent
err := json.Unmarshal(data, &val)
if err != nil {
return nil, errors.New("failed to unmarshal event: " + err.Error())
}
actualEventData, err := m.UnmarshalEventData(val.Id, val.Data)
if err != nil {
return nil, errors.New("failed to unmarshal event data: " + err.Error())
}
return &Event{
Id: val.Id,
Channel: val.Channel,
EchoId: val.EchoId,
Data: actualEventData,
}, nil
}
func (m *EventsMapper) UnmarshalEventData(eventId string, data []byte) (any, error) {
val, ok := m.Mapping[eventId]
if !ok {
return nil, errors.New("event id not found")
}
newVal := reflect.New(reflect.TypeOf(val))
err := json.Unmarshal(data, newVal.Interface())
if err != nil {
return nil, err
}
return newVal.Elem().Interface(), nil
}

11
pkg/eventbus/go.mod Normal file
View File

@@ -0,0 +1,11 @@
module eventbus
go 1.24.4
require github.com/stretchr/testify v1.11.1
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

16
pkg/eventbus/logger.go Normal file
View File

@@ -0,0 +1,16 @@
package eventbus
import "log"
type Logger interface {
Printf(string, ...interface{})
}
type loggerImpl struct{}
func (l loggerImpl) Printf(s string, i ...interface{}) {
log.Printf(s, i...)
}
// Log replace with your own logger if needed
var Log Logger = &loggerImpl{}

29
pkg/eventbus/options.go Normal file
View File

@@ -0,0 +1,29 @@
package eventbus
type options struct {
log Logger
workerSize int
queueSize int
}
type Option func(*options)
func WithLogger(logger Logger) Option {
return func(o *options) { o.log = logger }
}
func WithWorkerSize(workerSize int) Option {
return func(o *options) {
if workerSize >= 1 {
o.workerSize = workerSize
}
}
}
func WithQueueSize(queueSize int) Option {
return func(o *options) {
if queueSize >= 1 {
o.queueSize = queueSize
}
}
}

10
pkg/eventbus/readme.md Normal file
View File

@@ -0,0 +1,10 @@
# eventbus
rewrite version of original event manager in AynaLivePlayer
inspired by [asaskevich/EventBus](https://github.com/asaskevich/EventBus)
# todo
- custom marshaller