fix deadlock situation

This commit is contained in:
aynakeya
2025-10-03 23:57:17 +08:00
parent 918e2e81b3
commit 7c3f8587f6
4 changed files with 73 additions and 65 deletions

View File

@@ -42,7 +42,7 @@ type Publisher interface {
// Caller is special usage of a Publisher
type Caller interface {
Call(pubEvtId string, data interface{}, subEvtId string) (*Event, error)
Call(pubEvtId string, subEvtId string, data interface{}) (*Event, error)
Reply(req *Event, eventId string, data interface{}) error
}

View File

@@ -3,7 +3,6 @@ package eventbus
import (
"errors"
"fmt"
"hash/fnv"
"sync"
"sync/atomic"
"time"
@@ -24,8 +23,8 @@ type task struct {
// bus implements Bus.
type bus struct {
// configuration
workerCount int
queueSize int
maxWorkerSize int
queueSize int
// workers
queues []chan task
@@ -39,9 +38,10 @@ type bus struct {
drainedCh chan struct{}
// routing & bookkeeping
mu sync.RWMutex
handlers map[string]map[string]handlerRec // eventId -> handlerName -> handlerRec
pending []*Event // events published before Start()
mu sync.RWMutex
handlers map[string]map[string]handlerRec // eventId -> handlerName -> handlerRec
workerIdxes map[string]int // eventId -> workerIdx
pending []*Event // events published before Start()
// rendezvous for Call/EchoId
waitMu sync.Mutex
@@ -57,32 +57,39 @@ type bus struct {
// workerCount >= 1, queueSize >= 1.
func New(opts ...Option) Bus {
option := options{
log: Log,
workerSize: 10,
queueSize: 100,
log: Log,
maxWorkerSize: 10,
queueSize: 100,
}
for _, opt := range opts {
opt(&option)
}
b := &bus{
workerCount: option.workerSize,
queueSize: option.queueSize,
queues: make([]chan task, option.workerSize),
stopCh: make(chan struct{}),
drainedCh: make(chan struct{}),
handlers: make(map[string]map[string]handlerRec),
pending: make([]*Event, 0, 16),
echoWaiter: make(map[string]chan *Event),
log: option.log,
maxWorkerSize: option.maxWorkerSize,
queueSize: option.queueSize,
queues: make([]chan task, 0, option.maxWorkerSize),
stopCh: make(chan struct{}),
drainedCh: make(chan struct{}),
handlers: make(map[string]map[string]handlerRec),
workerIdxes: make(map[string]int),
pending: make([]*Event, 0, 16),
echoWaiter: make(map[string]chan *Event),
log: option.log,
}
for i := 0; i < option.workerSize; i++ {
q := make(chan task, option.queueSize)
b.queues[i] = q
go b.workerLoop(q)
for i := 0; i < option.maxWorkerSize; i++ {
b.addWorker()
}
return b
}
func (b *bus) addWorker() {
b.mu.Lock()
q := make(chan task, b.queueSize)
b.queues = append(b.queues, q)
go b.workerLoop(q)
b.mu.Unlock()
}
func (b *bus) workerLoop(q chan task) {
for {
select {
@@ -171,24 +178,14 @@ func (b *bus) Subscribe(channel string, eventId, handlerName string, fn HandlerF
if m == nil {
m = make(map[string]handlerRec)
b.handlers[eventId] = m
b.workerIdxes[eventId] = len(b.workerIdxes) % b.maxWorkerSize // assign a worker index for this eventId
}
m[handlerName] = handlerRec{name: handlerName, fn: fn, channel: channel}
return nil
}
func (b *bus) SubscribeAny(eventId, handlerName string, fn HandlerFunc) error {
if eventId == "" || handlerName == "" || fn == nil {
return errors.New("invalid Subscribe args")
}
b.mu.Lock()
defer b.mu.Unlock()
m := b.handlers[eventId]
if m == nil {
m = make(map[string]handlerRec)
b.handlers[eventId] = m
}
m[handlerName] = handlerRec{name: handlerName, fn: fn, channel: ""}
return nil
return b.Subscribe("", eventId, handlerName, fn)
}
func (b *bus) SubscribeOnce(channel, eventId, handlerName string, fn HandlerFunc) error {
@@ -201,6 +198,7 @@ func (b *bus) SubscribeOnce(channel, eventId, handlerName string, fn HandlerFunc
if m == nil {
m = make(map[string]handlerRec)
b.handlers[eventId] = m
b.workerIdxes[eventId] = len(b.workerIdxes) % b.maxWorkerSize // assign a worker index for this eventId
}
m[handlerName] = handlerRec{channel: channel, name: handlerName, fn: fn, once: true}
return nil
@@ -246,6 +244,10 @@ func (b *bus) PublishEvent(ev *Event) error {
case ch <- ev:
default:
}
// in this case, we found this event belong to local call
// so we don't need to dispatch this event to other subscriber
b.waitMu.Unlock()
return nil
}
b.waitMu.Unlock()
}
@@ -285,7 +287,7 @@ func (b *bus) PublishEvent(ev *Event) error {
// Enqueue each handler on its shard (worker) based on (eventId, handlerName).
for _, h := range hs {
idx := shardIndex(b.workerCount, ev.Id, h.name)
idx := b.shardIndex(ev.Id, h.name)
b.wg.Add(1)
select {
case b.queues[idx] <- task{ev: cloneEvent(ev), h: h}:
@@ -300,7 +302,7 @@ func (b *bus) PublishEvent(ev *Event) error {
// Call publishes a request and waits for a response event with the same EchoId.
// NOTE: Handlers should reply by publishing an Event with the SAME EchoId.
// Use Reply helper below.
func (b *bus) Call(eventId string, data interface{}, subEvtId string) (*Event, error) {
func (b *bus) Call(eventId string, subEvtId string, data interface{}) (*Event, error) {
if eventId == "" {
return nil, errors.New("empty eventId")
}
@@ -345,12 +347,18 @@ func (b *bus) nextEchoId() string {
return fmt.Sprintf("echo-%d-%d", time.Now().UnixNano(), x)
}
func shardIndex(n int, eventId, handlerName string) int {
h := fnv.New32a()
_, _ = h.Write([]byte(eventId))
_, _ = h.Write([]byte{0})
_, _ = h.Write([]byte(handlerName))
return int(h.Sum32() % uint32(n))
func (b *bus) shardIndex(eventId, handlerName string) int {
val, _ := b.workerIdxes[eventId]
return val
// what if two different eventId and handlerName produce same shard index?
// and one handler happens to call another event synchronously?
// well, in that case, the second event will be blocked until the first one finishes
// which cause deadlock if the first one is waiting for the second one to finish
//h := fnv.New32a()
//_, _ = h.Write([]byte(eventId))
//_, _ = h.Write([]byte{0})
//_, _ = h.Write([]byte(handlerName))
//return int(h.Sum32() % uint32(n))
}
func cloneEvent(e *Event) *Event {

View File

@@ -13,7 +13,7 @@ import (
// TestBasicLifecycle verifies the fundamental Start, Stop, and Wait operations.
func TestBasicLifecycle(t *testing.T) {
b := New(WithWorkerSize(2), WithQueueSize(10))
b := New(WithMaxWorkerSize(2), WithQueueSize(10))
// Start should only work once.
err := b.Start()
@@ -33,7 +33,7 @@ func TestBasicLifecycle(t *testing.T) {
// TestSubscribeAndPublish verifies the core functionality of publishing an event
// and having a subscriber receive it.
func TestSubscribeAndPublish(t *testing.T) {
b := New(WithWorkerSize(1), WithQueueSize(10))
b := New(WithMaxWorkerSize(1), WithQueueSize(10))
err := b.Start()
require.NoError(t, err)
defer b.Stop()
@@ -59,7 +59,7 @@ func TestSubscribeAndPublish(t *testing.T) {
// TestUnsubscribe ensures that a handler stops receiving events after unsubscribing.
func TestUnsubscribe(t *testing.T) {
b := New(WithWorkerSize(2), WithQueueSize(10))
b := New(WithMaxWorkerSize(2), WithQueueSize(10))
b.Start()
defer b.Stop()
@@ -89,7 +89,7 @@ func TestUnsubscribe(t *testing.T) {
// TestSubscribeOnce verifies that a handler subscribed with SubscribeOnce
// is only called once and then automatically removed.
func TestSubscribeOnce(t *testing.T) {
b := New(WithWorkerSize(1), WithQueueSize(10))
b := New(WithMaxWorkerSize(1), WithQueueSize(10))
b.Start()
defer b.Stop()
@@ -102,7 +102,7 @@ func TestSubscribeOnce(t *testing.T) {
wg.Done()
}
err := b.SubscribeOnce("event-once", "handler-once", handler)
err := b.SubscribeOnce("", "event-once", "handler-once", handler)
require.NoError(t, err)
// Publish twice
@@ -118,7 +118,7 @@ func TestSubscribeOnce(t *testing.T) {
// TestChannelSubscription validates that handlers correctly receive events based on channel matching.
func TestChannelSubscription(t *testing.T) {
b := New(WithWorkerSize(2), WithQueueSize(20))
b := New(WithMaxWorkerSize(2), WithQueueSize(20))
b.Start()
defer b.Stop()
@@ -184,7 +184,7 @@ func TestChannelSubscription(t *testing.T) {
// TestPublishBeforeStart ensures that events published before the bus starts are queued
// and processed after Start() is called.
func TestPublishBeforeStart(t *testing.T) {
b := New(WithWorkerSize(1), WithQueueSize(10))
b := New(WithMaxWorkerSize(1), WithQueueSize(10))
var receivedCount int32
var wg sync.WaitGroup
@@ -215,7 +215,7 @@ func TestPublishBeforeStart(t *testing.T) {
// TestCall validates the request-response pattern using the Call method.
func TestCall(t *testing.T) {
b := New(WithWorkerSize(2), WithQueueSize(10))
b := New(WithMaxWorkerSize(2), WithQueueSize(10))
b.Start()
defer b.Stop()
@@ -234,7 +234,7 @@ func TestCall(t *testing.T) {
require.NoError(t, err)
// Make the call
resp, err := b.Call("request-event", "my-data", "response-event")
resp, err := b.Call("request-event", "response-event", "my-data")
// Verify response
require.NoError(t, err)
@@ -245,7 +245,7 @@ func TestCall(t *testing.T) {
// TestCall_StopDuringWait checks that Call returns an error if the bus is stopped while waiting.
func TestCall_StopDuringWait(t *testing.T) {
b := New(WithWorkerSize(1), WithQueueSize(10))
b := New(WithMaxWorkerSize(1), WithQueueSize(10))
b.Start()
var callErr error
@@ -255,7 +255,7 @@ func TestCall_StopDuringWait(t *testing.T) {
go func() {
defer wg.Done()
// This call will never get a response
_, callErr = b.Call("no-reply-event", nil, "no-reply-response")
_, callErr = b.Call("no-reply-event", "no-reply-response", nil)
}()
// Give the goroutine time to start waiting
@@ -269,7 +269,7 @@ func TestCall_StopDuringWait(t *testing.T) {
// TestPanicRecovery ensures that a panicking handler does not crash the worker.
func TestPanicRecovery(t *testing.T) {
b := New(WithWorkerSize(1), WithQueueSize(10))
b := New(WithMaxWorkerSize(1), WithQueueSize(10))
b.Start()
defer b.Stop()
@@ -304,7 +304,7 @@ func TestPanicRecovery(t *testing.T) {
func TestConcurrency(t *testing.T) {
workerCount := 4
queueSize := 50
b := New(WithWorkerSize(workerCount), WithQueueSize(queueSize))
b := New(WithMaxWorkerSize(workerCount), WithQueueSize(queueSize))
b.Start()
defer b.Stop()
@@ -368,7 +368,7 @@ func TestConcurrency(t *testing.T) {
// TestInvalidArguments checks that API methods return errors on invalid input.
func TestInvalidArguments(t *testing.T) {
b := New(WithWorkerSize(1), WithQueueSize(1))
b := New(WithMaxWorkerSize(1), WithQueueSize(1))
// Subscribe
err := b.Subscribe("", "", "name", func(e *Event) {})
@@ -383,7 +383,7 @@ func TestInvalidArguments(t *testing.T) {
require.Error(t, err, "SubscribeAny should error on empty eventId")
// SubscribeOnce
err = b.SubscribeOnce("", "name", func(e *Event) {})
err = b.SubscribeOnce("", "", "name", func(e *Event) {})
require.Error(t, err, "SubscribeOnce should error on empty eventId")
// Unsubscribe
@@ -393,6 +393,6 @@ func TestInvalidArguments(t *testing.T) {
require.Error(t, err, "Unsubscribe should error on empty handlerName")
// Call
_, err = b.Call("", nil, "subID")
_, err = b.Call("", "subID", nil)
require.Error(t, err, "Call should error on empty eventId")
}

View File

@@ -1,9 +1,9 @@
package eventbus
type options struct {
log Logger
workerSize int
queueSize int
log Logger
maxWorkerSize int
queueSize int
}
type Option func(*options)
@@ -12,10 +12,10 @@ func WithLogger(logger Logger) Option {
return func(o *options) { o.log = logger }
}
func WithWorkerSize(workerSize int) Option {
func WithMaxWorkerSize(maxWorkerSize int) Option {
return func(o *options) {
if workerSize >= 1 {
o.workerSize = workerSize
if maxWorkerSize >= 1 {
o.maxWorkerSize = maxWorkerSize
}
}
}