Bound Go Concurrency with Runtime-Adjustable BlockingLatch
BlockingLatch uses sync.Cond to enforce strict in-flight work limits, block producers immediately at capacity, reuse slots instantly on completion, adjust ceilings at runtime, and drain to zero on shutdown—ideal for streams under sustained pressure.
Block Producers at Capacity Ceiling Without Internal Buffering
Unbounded concurrency in Go systems like AMQP stream consumers leads to silent memory growth as messages pile up in-flight. Producers must block exactly when in-flight work hits a hard ceiling, propagating overload outward to brokers or upstreams instead of absorbing it internally. This ensures pressure stays visible and systems remain observable under load. Capacity must reuse immediately on completion—no lag—and support runtime adjustments based on downstream health or resources, plus optional deterministic drain to zero on shutdown.
Buffered channels handle static limits by blocking sends when full, but fail for dynamic ceilings: capacity fixes at creation, requiring messy channel swaps and draining under load, which disrupts flow.
sync.Cond Powers Precise, Shared Invariant Coordination
Pair sync.Cond with a mutex to let multiple goroutines wait on the same condition (current count < max) without busy-spinning. Producers call Increment(), which locks, loops until space available (cond.Wait() atomically unlocks and suspends), then increments count and unlocks. Decrement() drops count and signals one waiter via cond.Signal(). SetMax(newMax) updates max and broadcasts to wake all, letting them recheck against the new limit.
type BlockingLatch struct {
count uint
max uint
lock *sync.Mutex
notMax *sync.Cond
}
func (l *BlockingLatch) Increment() {
l.lock.Lock()
for l.count >= l.max {
l.notMax.Wait()
}
l.count++
l.lock.Unlock()
}
func (l *BlockingLatch) Decrement() {
l.lock.Lock()
l.count--
l.notMax.Signal()
l.lock.Unlock()
}
func (l *BlockingLatch) SetMax(max uint) {
l.lock.Lock()
l.max = max
l.notMax.Broadcast()
l.lock.Unlock()
}
func (l *BlockingLatch) WaitForEmpty() {
l.lock.Lock()
for l.count > 0 {
l.notMax.Wait()
}
l.lock.Unlock()
}
This keeps in-flight work provably <= max, with zero buffer overhead—producers block because the system is saturated, not a queue.
Apply to Streams, Pools, and Pipelines for Sustained Pressure
In AMQP consumers, initialize latch at prefetch count; Increment() before forwarding message, tie Ack/Nack to Decrement(). Brokers see backpressure directly, no internal queuing.
latch := util.NewBlockingLatch(uint(source.GetPrefetchCount()))
handleMessages := func(ctx stream.ConsumerContext, message *amqp.Message) {
latch.Increment() // blocks at ceiling
messageChannel <- message
stm.Ack = func() { latch.Decrement() }
stm.Nack = func() { latch.Decrement() }
}
Extend to HTTP throttling (bound requests), worker pools (cap goroutines), batch pipelines (block fast producers), or resource pools (FDs/connections). Use WaitForEmpty() for clean shutdown sync. Adjust max runtime without restarts, sustaining throughput at limits while demand exists—perfect for enterprise systems where backpressure is a core design requirement, not afterthought.