package main
import (
"strings"
"sync"
"time"
abxbus "github.com/ArchiveBox/abxbus/abxbus-go"
)
func main() {
options := &abxbus.EventBusOptions{EventConcurrency: abxbus.EventConcurrencyGlobalSerial}
busA := abxbus.NewEventBus("GlobalSerialA", options)
busB := abxbus.NewEventBus("GlobalSerialB", options)
var mu sync.Mutex
inFlight := 0
maxInFlight := 0
starts := []string{}
handler := func(event *abxbus.BaseEvent) (any, error) {
mu.Lock()
inFlight++
if inFlight > maxInFlight {
maxInFlight = inFlight
}
starts = append(starts, event.Payload["source"].(string))
mu.Unlock()
time.Sleep(10 * time.Millisecond)
mu.Lock()
inFlight--
mu.Unlock()
return nil, nil
}
busA.On("SerialEvent", "handler", handler, nil)
busB.On("SerialEvent", "handler", handler, nil)
for i := 0; i < 3; i++ {
busA.Emit(abxbus.NewBaseEvent("SerialEvent", map[string]any{"order": i, "source": "a"}))
busB.Emit(abxbus.NewBaseEvent("SerialEvent", map[string]any{"order": i, "source": "b"}))
}
busA.WaitUntilIdle(nil)
busB.WaitUntilIdle(nil)
if maxInFlight != 1 {
panic("expected global serialization")
}
if strings.Count(strings.Join(starts, ""), "a") != 3 || strings.Count(strings.Join(starts, ""), "b") != 3 {
panic("expected both buses to process their queues")
}
}