This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
go-pubbing is a production-grade, single-instance in-memory pub/sub system for Go with hierarchical topics, wildcard subscriptions, and optional statefulness. The motto is: Excellence. Always.
COMPLETED - Phase 1 (Exact Topic Matching) - First dev cycle
All core infrastructure and fundamental features are implemented and tested:
- Broker: Full lifecycle (New, Publish, Subscribe, Shutdown)
- Topic System: Hierarchical topics with exact matching (no wildcards yet)
- Subscriptions: Context-aware, handler-based, automatic cleanup
- Messages: Immutable with defensive copies, sequence numbers, timestamps, headers
- Options: Functional options for broker, publish, and subscribe
- Logging: Injectable logger interface with Zap adapter
- Error Handling: Rich error types with context
- Testing: 79 tests, 100% pass, race detection enabled
- Performance: 6M+ topic matches/sec, 157ns/op
- Documentation: Comprehensive README, 11 runnable examples
- Main package: 67.0%
- Adapters: 100.0%
- Internal: 69.6%
- Topic system: 98.7%
- Phase 2: Wildcard patterns (
*
and >
)
- Phase 3: Message retention and replay
- Phase 4: Acknowledgment and delivery guarantees
- Phase 5: Queue groups and load balancing
- Single-threaded broker loop: No locks, all mutable state owned by one goroutine ✅
- Safety first: No panics (except init), recover from user panics, no goroutine leaks ✅
- Zero external dependencies: Pure Go (except testing dependencies) ✅
- Command pattern: All state changes via commands sent to broker loop ✅
- Future-proof: Architecture supports multi-instance extension ✅
NO LITERAL STRINGS IN CODE - Always use CONSTANTS for:
- Error messages
- Configuration keys
- Topic patterns
- Field names in maps/structs
- Any string that appears in code logic
Bad:
if headers["content-type"] == "application/json" { ... }
Good:
const (
HeaderContentType = "content-type"
MimeTypeJSON = "application/json"
)
if headers[HeaderContentType] == MimeTypeJSON { ... }
User Code → Broker API (thread-safe) → Command Channels → Central Broker Loop (single goroutine)
↓
Topic Trie + Retention + Subscribers
↓
Subscriber Goroutines (one per subscription)
Key principle: Broker loop is single-threaded and owns ALL mutable state. No locks needed in loop. External API is thread-safe via channels.
-
Broker (
broker.go
, broker_internal.go
)
- Public API:
Publish()
, Subscribe()
, Request()
, Shutdown()
- Internal loop: Processes commands sequentially
- Uses channels for communication (cmdCh, stopCh, stoppedCh)
-
Topic Trie (
topic/trie.go
)
- Efficient O(segments) wildcard matching
- Supports
*
(single segment) and >
(multi-level)
- Pre-structured at subscribe time, not publish time
- RWMutex because reads >> writes
-
Retention Store (
retention/store.go
, retention/ringbuffer.go
)
- Per-topic ring buffers
- Lazy allocation (only when messages published)
- Policy inheritance: specific > wildcard > global
- Bounded memory by design
-
Subscription (
subscription.go
)
- Each has own goroutine for isolation
- Supports callback OR channel (not both)
- Handles ack/nack, redelivery, panic recovery
-
Message (
message.go
)
- ID: NanoID (not UUID - shorter)
- Sequence: uint64 (global monotonic via atomic)
- Payload: []byte (zero-copy)
- Headers: map[string]string
-
Commands (
internal/command.go
)
- publishCmd, subscribeCmd, unsubscribeCmd, ackCmd, shutdownCmd, statsCmd
- Synchronous commands use result channels
- Async commands (ack) are fire-and-forget
- Segments separated by
.
(dot)
- No empty segments:
mission..state
is INVALID
- No leading/trailing dots:
.mission
or mission.
is INVALID
*
matches exactly one segment: mission.*.change.*
>
matches one or more segments, must be last: mission.>
- Only valid characters:
a-zA-Z0-9-_
# Initialize module (if not done)
go mod init github.com/itsatony/go-pubbing
# Get dependencies (minimal - only test deps)
go mod tidy
# Run all tests
go test ./...
# Run with race detector (ALWAYS use during development)
go test -race ./...
# Run with coverage
go test -cover ./...
# Run specific test
go test -run TestBrokerPublishSubscribe ./...
# Run benchmarks
go test -bench=. -benchmem ./...
# Run benchmarks for specific component
go test -bench=BenchmarkTopicMatching ./topic/
# Vet code
go vet ./...
# Format code
go fmt ./...
# Build (library, no binary)
go build ./...
# CPU profile
go test -cpuprofile=cpu.prof -bench=BenchmarkHighThroughput
go tool pprof cpu.prof
# Memory profile
go test -memprofile=mem.prof -bench=BenchmarkPublish
go tool pprof mem.prof
- Use
sync.Pool
for Message objects to reduce GC pressure
- Limit channel buffer sizes to prevent unbounded growth
- Clean up closed subscriptions promptly
- Pre-allocate slices when size is known
- Define specific error types in
errors.go
- Use
errors.New()
for simple errors
- Use custom error types for rich context (see
PubSubError
in plan)
- Never panic in user-facing code (only in init/New)
- Wrap errors with operation context
- Broker.Publish(): Safe for concurrent use
- Broker.Subscribe(): Safe for concurrent use
- Subscription.Unsubscribe(): Safe for concurrent use
- Message: Immutable after creation (except ack/nack methods)
- Internal state: Protected by single-threaded broker loop (no locks!)
- Test files parallel the source files:
broker_test.go
, topic/trie_test.go
- Use
testutil/
for shared test helpers
- Always test with
-race
flag
- Write benchmarks for hot paths
- Use
examples_test.go
for godoc examples
- Test categories:
- Basic functionality (pub/sub, unsub)
- Topic matching (wildcards, patterns)
- Statefulness (retention, replay, durable)
- Reliability (ack/nack, redelivery, timeouts)
- Queue groups (load balancing)
- Performance (backpressure, slow consumers)
- Concurrency (race conditions, deadlocks)
- Shutdown (graceful, timeout)
go-pubbing/
├── broker.go # Public API
├── broker_internal.go # Broker loop implementation
├── options.go # Functional options
├── message.go # Message type
├── subscription.go # Subscription type
├── errors.go # Error types
├── hooks.go # Observability hooks
├── stats.go # Statistics
├── topic/
│ ├── trie.go # Topic matching
│ └── validation.go # Topic/pattern validation
├── retention/
│ ├── store.go # Retention store
│ ├── ringbuffer.go # Ring buffer
│ └── policy.go # Retention policies
├── internal/
│ ├── command.go # Command types
│ ├── nanoid.go # ID generation
│ └── pool.go # Object pooling
└── testutil/
└── testutil.go # Test helpers
- Recursive descent through trie
- Try exact match → single wildcard (*) → multi-wildcard (>)
- Multi-wildcard matches current and ALL remaining segments
- Pre-compiled patterns at subscribe time for efficiency
- Fixed capacity circular buffer per topic
- Overwrites oldest when full
- Tracks start sequence (oldest) and end sequence (next write)
- O(1) push and range retrieval
- Group subscribers by queue group name
- Fan-out to all non-queue subscribers
- Round-robin within each queue group (simple:
msgSequence % len(members)
)
- Future: Consider least-loaded or weighted distribution
- Generate unique inbox:
_INBOX.<nanoID>
- Create ephemeral subscription to inbox
- Publish with
ReplyTo
header set to inbox
- Wait for reply with timeout
- Auto-unsubscribe when done
// Always use functional options for configuration
broker, err := pubbing.New(
pubbing.WithRetention(10000, 1*time.Hour),
pubbing.WithBackpressure(pubbing.BackpressureModeDropOldest),
)
err = broker.Publish("topic", payload,
pubbing.WithHeaders(map[string]string{"trace-id": "123"}),
pubbing.WithReplyTo("response.topic"),
)
// Always pass context to Subscribe
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sub, err := broker.Subscribe(ctx, "pattern", handler)
// Subscription automatically unsubscribes when ctx is cancelled
// In shutdown:
// 1. Stop accepting new publishes
// 2. Wait for pending acks (with timeout)
// 3. Cancel all subscriptions
// 4. Wait for subscriber goroutines to exit
// 5. Close broker loop
broker.Shutdown(5 * time.Second)
// Subscription goroutine MUST recover from handler panics
func (s *Subscription) safeExecuteHandler(msg *Message) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("handler panic: %v", r)
// Log stack trace
}
}()
return s.handler(msg)
}
- Single pub/sub: 500k+ msgs/sec
- 10x10: 250k+ msgs/sec
- 100x100: 100k+ msgs/sec
- Latency P50/P99: <100µs / <1ms
- Per subscription overhead: <1KB
- Topics: 100k+ topics
- Subscribers: 10k+ active subscriptions
- Minimize allocations (sync.Pool, pre-allocate)
- Lock-free paths (atomics, single-threaded loop)
- Efficient data structures (trie, ring buffer, maps)
- Profile before optimizing (use pprof)
- DON'T use locks in broker loop - It's single-threaded by design
- DON'T block in subscriber handlers - They block delivery
- DON'T create unbounded channels - Set buffer limits
- DON'T forget to unsubscribe - Causes goroutine leaks
- DON'T panic in production code - Only in New() for invalid config
- DON'T use literal strings - ALWAYS use constants
- DON'T assume ordering across topics - Only per-topic ordering guaranteed
- DON'T forget race detector - ALWAYS run tests with
-race
The architecture is designed to support multi-instance/distributed operation:
- Storage abstraction (Redis, S3 backends)
- Partition strategy (PartitionID + LocalSequence)
- Broker clustering (Raft/etcd)
- Distributed queue groups
- Sequence number evolution to (PartitionID, Sequence) tuples
Keep this in mind when making design decisions - don't paint into corners.
Publish → Broker API → publishCmd → Broker Loop → Topic Trie Match
→ Dispatch to Subscribers → Subscriber Goroutines → Handler/Channel
→ Ack/Nack → Broker Loop → Update State
- Ephemeral: No replay, only new messages
- Durable: Named, tracks position, can replay
- Callback: Handler function (auto-ack based on return)
- Channel: User reads from channel (manual ack)
- Queue Group: Load balanced across members
DefaultRetentionMessages = 10000
DefaultRetentionAge = 1 * time.Hour
DefaultAckTimeout = 30 * time.Second
DefaultMaxRedelivery = 3
DefaultBufferSize = 100
- Every exported type/function/constant MUST have godoc comment
- Comments are complete sentences starting with the name
- Explain WHY, not WHAT (code shows what)
- Mark TODOs with reason:
// TODO(username): Fix race condition when... #123
- Write runnable examples in
examples_test.go
MIT License - See LICENSE file