Bound Go Concurrency with Runtime-Adjustable BlockingLatch

BlockingLatch uses sync.Cond to enforce strict in-flight work limits, block producers immediately at capacity, reuse slots instantly on completion, adjust ceilings at runtime, and drain to zero on shutdown—ideal for streams under sustained pressure.

Block Producers at Capacity Ceiling Without Internal Buffering

Unbounded concurrency in Go systems like AMQP stream consumers leads to silent memory growth as messages pile up in-flight. Producers must block exactly when in-flight work hits a hard ceiling, propagating overload outward to brokers or upstreams instead of absorbing it internally. This ensures pressure stays visible and systems remain observable under load. Capacity must reuse immediately on completion—no lag—and support runtime adjustments based on downstream health or resources, plus optional deterministic drain to zero on shutdown.

Buffered channels handle static limits by blocking sends when full, but fail for dynamic ceilings: capacity fixes at creation, requiring messy channel swaps and draining under load, which disrupts flow.

sync.Cond Powers Precise, Shared Invariant Coordination

Pair sync.Cond with a mutex to let multiple goroutines wait on the same condition (current count < max) without busy-spinning. Producers call Increment(), which locks, loops until space available (cond.Wait() atomically unlocks and suspends), then increments count and unlocks. Decrement() drops count and signals one waiter via cond.Signal(). SetMax(newMax) updates max and broadcasts to wake all, letting them recheck against the new limit.

type BlockingLatch struct {
  count uint
  max   uint
  lock  *sync.Mutex
  notMax *sync.Cond
}

func (l *BlockingLatch) Increment() {
  l.lock.Lock()
  for l.count >= l.max {
    l.notMax.Wait()
  }
  l.count++
  l.lock.Unlock()
}

func (l *BlockingLatch) Decrement() {
  l.lock.Lock()
  l.count--
  l.notMax.Signal()
  l.lock.Unlock()
}

func (l *BlockingLatch) SetMax(max uint) {
  l.lock.Lock()
  l.max = max
  l.notMax.Broadcast()
  l.lock.Unlock()
}

func (l *BlockingLatch) WaitForEmpty() {
  l.lock.Lock()
  for l.count > 0 {
    l.notMax.Wait()
  }
  l.lock.Unlock()
}

This keeps in-flight work provably <= max, with zero buffer overhead—producers block because the system is saturated, not a queue.

Apply to Streams, Pools, and Pipelines for Sustained Pressure

In AMQP consumers, initialize latch at prefetch count; Increment() before forwarding message, tie Ack/Nack to Decrement(). Brokers see backpressure directly, no internal queuing.

latch := util.NewBlockingLatch(uint(source.GetPrefetchCount()))
handleMessages := func(ctx stream.ConsumerContext, message *amqp.Message) {
  latch.Increment() // blocks at ceiling
  messageChannel <- message
  stm.Ack = func() { latch.Decrement() }
  stm.Nack = func() { latch.Decrement() }
}

Extend to HTTP throttling (bound requests), worker pools (cap goroutines), batch pipelines (block fast producers), or resource pools (FDs/connections). Use WaitForEmpty() for clean shutdown sync. Adjust max runtime without restarts, sustaining throughput at limits while demand exists—perfect for enterprise systems where backpressure is a core design requirement, not afterthought.

Summarized by x-ai/grok-4.1-fast via openrouter

5264 input / 1672 output tokens in 12102ms

© 2026 Edge