Documentation
¶
Overview ¶
Package buffer provides generic, thread-safe buffer implementations with various overflow policies.
This package offers flexible buffer types:
- CircularBuffer: Fixed-size buffer with configurable overflow policies
- Support for DropOldest, DropNewest, and Block overflow policies
- Statistics always enabled for observability
- Optional Prometheus metrics integration via functional options
All buffer implementations are thread-safe and always collect statistics for observability. Prometheus metrics can be optionally enabled via WithMetrics() functional option.
Package buffer provides thread-safe circular buffers with configurable overflow policies, built-in statistics tracking, and optional Prometheus metrics integration.
Overview ¶
The buffer package implements high-performance circular buffers for managing data flow between producers and consumers in concurrent systems. Buffers are generic, thread-safe, and provide comprehensive observability through always-on statistics and optional metrics.
Quick Start ¶
Basic buffer creation:
buf, err := buffer.NewCircularBuffer[int](1000)
if err != nil {
log.Fatal(err)
}
// Write data
err = buf.Write(42)
// Read data
value, ok := buf.Read()
With overflow policy and metrics:
buf, err := buffer.NewCircularBuffer[[]byte](5000, buffer.WithOverflowPolicy[[]byte](buffer.DropOldest), buffer.WithMetrics[[]byte](registry, "network_input"), )
Overflow Policies ¶
The buffer supports three overflow behaviors when capacity is reached:
- DropOldest: Remove oldest item to make room (default)
- DropNewest: Reject new items when full
- Block: Write operations wait for available space
Example with blocking policy:
buf, _ := buffer.NewCircularBuffer[*Event](100, buffer.WithOverflowPolicy[*Event](buffer.Block), ) // Write with timeout when using Block policy ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() err := buf.WriteWithContext(ctx, event)
Observability Architecture ¶
The buffer package implements a dual-tracking pattern for comprehensive observability:
Statistics (Always On):
- Tracks all operations using atomic counters
- Zero configuration required
- Available via buf.Stats()
- Provides computed metrics (throughput, drop rate, utilization)
- No external dependencies
Prometheus Metrics (Optional):
- Enabled via WithMetrics() option
- Exports to Prometheus for time-series monitoring
- Includes component labels for instance identification
- Standard metric types (Counter, Gauge)
Design Decision: Dual Tracking Pattern ¶
Both Statistics and Metrics track operations independently, which appears redundant but serves distinct operational purposes:
Why Track Twice?
1. Independence: Statistics work without Prometheus dependency
- Always available for debugging, even in minimal deployments
- No external infrastructure required for basic observability
2. Computed Metrics: Statistics provide derived values not available in raw Prometheus
- Throughput (ops/sec) with built-in timing
- Drop rate as percentage (drops / writes)
- Overflow rate as percentage (overflows / writes)
- Utilization relative to capacity
3. Different Use Cases:
- Statistics: Programmatic access, debugging, tests, local monitoring
- Metrics: Time-series analysis, dashboards, alerting, production monitoring
4. Performance Trade-off:
- Overhead: ~50-100ns per operation for dual tracking
- At 100k ops/sec: ~0.5-1% total overhead
- Cost is negligible compared to observability value
Alternative Considered: Metrics-Based Statistics
We considered reading Statistics from Prometheus metrics to avoid duplication:
func (s *Statistics) Writes() int64 {
dto := &dto.Metric{}
s.metrics.writes.Write(dto)
return int64(dto.Counter.GetValue())
}
Rejected because:
- Creates Prometheus dependency for basic stats
- Reading from Prometheus is slower (~10x) than atomic operations
- Breaks Statistics when metrics are disabled
- Violates separation of concerns
Performance Impact ¶
Dual tracking overhead per operation:
- 1x atomic increment (Statistics)
- 1x atomic increment (Prometheus counter) if enabled
- 1x gauge set (Prometheus) if enabled
Benchmarks (M3 MacBook Pro):
- Write with stats only: ~97ns/op
- Write with stats + metrics: ~102ns/op (~5% overhead)
- Read with stats only: ~69ns/op
- Read with stats + metrics: ~72ns/op (~4% overhead)
At high throughput (100k ops/sec), dual tracking adds ~5-10ms/sec of overhead, which is acceptable for the operational visibility gained.
Thread Safety ¶
All buffer operations are thread-safe for concurrent use:
- Multiple producers can write concurrently
- Multiple consumers can read concurrently
- Statistics use atomic operations (lock-free)
- Metrics use Prometheus atomic types
- Internal state protected by sync.RWMutex
- Block policy uses sync.Cond for waiting
API Design Patterns ¶
Functional Options:
The package uses functional options for clean, composable configuration:
buf, _ := buffer.NewCircularBuffer[T](capacity, buffer.WithOverflowPolicy[T](policy), buffer.WithMetrics[T](registry, prefix), buffer.WithDropCallback[T](callback), )
This pattern provides:
- Clear intent with named functions
- Easy composition of features
- Backward compatibility when adding options
- Type-safe configuration
Generic Types:
Buffers are fully generic and work with any Go type:
intBuffer := buffer.NewCircularBuffer[int](100) byteBuffer := buffer.NewCircularBuffer[[]byte](1000) structBuffer := buffer.NewCircularBuffer[*MyStruct](500)
Performance Characteristics ¶
Operations:
- Write: O(1) constant time
- Read: O(1) constant time
- ReadBatch: O(n) where n is batch size
- Peek: O(1) constant time
- Size/IsFull/IsEmpty: O(1) constant time
Memory:
- Pre-allocated circular array
- No dynamic allocations during operation
- Memory usage: capacity * sizeof(T)
- Statistics overhead: ~200 bytes
- Metrics overhead: ~1KB when enabled
Common Use Cases ¶
Network Packet Buffering:
udpBuffer := buffer.NewCircularBuffer[[]byte](10000, buffer.WithOverflowPolicy[[]byte](buffer.DropOldest), buffer.WithMetrics[[]byte](registry, "udp_input"), )
Event Processing Pipeline:
eventBuffer := buffer.NewCircularBuffer[*Event](1000, buffer.WithOverflowPolicy[*Event](buffer.Block), buffer.WithMetrics[*Event](registry, "events"), )
Rate-Limited Processing:
taskBuffer := buffer.NewCircularBuffer[*Task](500,
buffer.WithOverflowPolicy[*Task](buffer.DropNewest),
buffer.WithDropCallback[*Task](func(t *Task) {
log.Printf("Dropped task: %s", t.ID)
}),
)
Testing ¶
The package includes comprehensive tests with race detection:
go test -race ./pkg/buffer
Benchmarks are available to validate performance:
go test -bench=. ./pkg/buffer
Examples ¶
See buffer_test.go and examples_test.go for runnable examples that appear in godoc.
Index ¶
- Constants
- func WithStats(ctx context.Context, stats *Statistics) context.Context
- type Buffer
- type DropCallback
- type Option
- type OverflowPolicy
- type Statistics
- func (s *Statistics) CurrentSize() int64
- func (s *Statistics) Drop()
- func (s *Statistics) DropRate() float64
- func (s *Statistics) Drops() int64
- func (s *Statistics) MaxSize() int64
- func (s *Statistics) MemoryUsage() int64
- func (s *Statistics) Overflow()
- func (s *Statistics) OverflowRate() float64
- func (s *Statistics) Overflows() int64
- func (s *Statistics) Peek()
- func (s *Statistics) Peeks() int64
- func (s *Statistics) Read()
- func (s *Statistics) ReadThroughput() float64
- func (s *Statistics) Reads() int64
- func (s *Statistics) Reset()
- func (s *Statistics) Summary() StatsSummary
- func (s *Statistics) Throughput() float64
- func (s *Statistics) UpdateMemoryUsage(usage int64)
- func (s *Statistics) UpdateSize(size int64)
- func (s *Statistics) Uptime() time.Duration
- func (s *Statistics) Utilization(capacity int64) float64
- func (s *Statistics) Write()
- func (s *Statistics) Writes() int64
- type StatsSummary
Constants ¶
const (
// ContextKeyStats can be used to pass statistics through context.
ContextKeyStats contextKey = "buffer-stats"
)
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Buffer ¶
type Buffer[T any] interface { // Write adds an item to the buffer. Returns an error if the operation fails. // Behavior depends on the overflow policy when buffer is full. Write(item T) error // Read retrieves and removes one item from the buffer. // Returns the item and true if successful, zero value and false if buffer is empty. Read() (T, bool) // ReadBatch retrieves and removes up to max items from the buffer. // Returns a slice containing the retrieved items (may be shorter than max). ReadBatch(max int) []T // Peek retrieves one item without removing it from the buffer. // Returns the item and true if successful, zero value and false if buffer is empty. Peek() (T, bool) // Size returns the current number of items in the buffer. Size() int // Capacity returns the maximum number of items the buffer can hold. Capacity() int // IsFull returns true if the buffer is at maximum capacity. IsFull() bool // IsEmpty returns true if the buffer contains no items. IsEmpty() bool // Clear removes all items from the buffer. Clear() // Stats returns buffer statistics (always available for observability). Stats() *Statistics // Close shuts down the buffer and releases any resources. Close() error }
Buffer represents a generic buffer interface that all buffer implementations must satisfy. The buffer is parameterized by item type T for type safety.
func NewCircularBuffer ¶
NewCircularBuffer creates a new circular buffer with the specified capacity and options. Stats are ALWAYS collected for observability. Metrics are optional via WithMetrics(). Returns an error if metrics registration fails when metrics are requested. Capacity is required - all other configuration is via functional options.
type DropCallback ¶
type DropCallback[T any] func(item T)
DropCallback is called when an item is dropped due to overflow policy. It receives the item that was dropped.
type Option ¶
type Option[T any] func(*bufferOptions[T])
Option configures buffer behavior using the functional options pattern. This provides a clean, extensible API for configuring buffers.
func WithDropCallback ¶
func WithDropCallback[T any](callback DropCallback[T]) Option[T]
WithDropCallback sets a callback function that is called when items are dropped. The callback receives the item that was dropped.
func WithMetrics ¶
func WithMetrics[T any](registry *metric.MetricsRegistry, prefix string) Option[T]
WithMetrics enables Prometheus metrics export for buffer statistics. If registry is nil, this option is ignored. Registry should not be nil in normal usage - this handles edge cases gracefully.
func WithOverflowPolicy ¶
func WithOverflowPolicy[T any](policy OverflowPolicy) Option[T]
WithOverflowPolicy sets the overflow behavior for the buffer. Defaults to DropOldest if not specified.
type OverflowPolicy ¶
type OverflowPolicy int
OverflowPolicy defines how the buffer behaves when it reaches capacity.
const ( // DropOldest removes the oldest item to make room for new items. DropOldest OverflowPolicy = iota // DropNewest drops new items when the buffer is full. DropNewest // Block causes Write operations to block until space is available. Block )
func (OverflowPolicy) String ¶
func (p OverflowPolicy) String() string
String returns a human-readable representation of the overflow policy.
type Statistics ¶
type Statistics struct {
// contains filtered or unexported fields
}
Statistics tracks buffer performance metrics.
func NewStatistics ¶
func NewStatistics() *Statistics
NewStatistics creates a new statistics tracker.
func StatsFromContext ¶
func StatsFromContext(ctx context.Context) (*Statistics, bool)
StatsFromContext retrieves statistics from the context.
func (*Statistics) CurrentSize ¶
func (s *Statistics) CurrentSize() int64
CurrentSize returns the current number of items in the buffer.
func (*Statistics) Drop ¶
func (s *Statistics) Drop()
Drop records an item drop due to overflow policy.
func (*Statistics) DropRate ¶
func (s *Statistics) DropRate() float64
DropRate returns the percentage of writes that resulted in drops (0.0 to 1.0).
func (*Statistics) Drops ¶
func (s *Statistics) Drops() int64
Drops returns the total number of dropped items.
func (*Statistics) MaxSize ¶
func (s *Statistics) MaxSize() int64
MaxSize returns the maximum number of items the buffer has held.
func (*Statistics) MemoryUsage ¶
func (s *Statistics) MemoryUsage() int64
MemoryUsage returns the estimated memory usage in bytes.
func (*Statistics) Overflow ¶
func (s *Statistics) Overflow()
Overflow records a buffer overflow event.
func (*Statistics) OverflowRate ¶
func (s *Statistics) OverflowRate() float64
OverflowRate returns the percentage of writes that caused overflows (0.0 to 1.0).
func (*Statistics) Overflows ¶
func (s *Statistics) Overflows() int64
Overflows returns the total number of overflow events.
func (*Statistics) Peeks ¶
func (s *Statistics) Peeks() int64
Peeks returns the total number of peek operations.
func (*Statistics) ReadThroughput ¶
func (s *Statistics) ReadThroughput() float64
ReadThroughput returns the average number of reads per second.
func (*Statistics) Reads ¶
func (s *Statistics) Reads() int64
Reads returns the total number of read operations.
func (*Statistics) Summary ¶
func (s *Statistics) Summary() StatsSummary
Summary returns a snapshot of all statistics.
func (*Statistics) Throughput ¶
func (s *Statistics) Throughput() float64
Throughput returns the average number of writes per second.
func (*Statistics) UpdateMemoryUsage ¶
func (s *Statistics) UpdateMemoryUsage(usage int64)
UpdateMemoryUsage updates the estimated memory usage.
func (*Statistics) UpdateSize ¶
func (s *Statistics) UpdateSize(size int64)
UpdateSize updates the current buffer size.
func (*Statistics) Uptime ¶
func (s *Statistics) Uptime() time.Duration
Uptime returns how long the buffer has been running.
func (*Statistics) Utilization ¶
func (s *Statistics) Utilization(capacity int64) float64
Utilization returns the current buffer utilization as a percentage (0.0 to 1.0).
func (*Statistics) Writes ¶
func (s *Statistics) Writes() int64
Writes returns the total number of write operations.
type StatsSummary ¶
type StatsSummary struct {
Writes int64 `json:"writes"`
Reads int64 `json:"reads"`
Peeks int64 `json:"peeks"`
Overflows int64 `json:"overflows"`
Drops int64 `json:"drops"`
CurrentSize int64 `json:"current_size"`
MaxSize int64 `json:"max_size"`
MemoryUsage int64 `json:"memory_usage"`
Throughput float64 `json:"throughput"`
ReadThroughput float64 `json:"read_throughput"`
DropRate float64 `json:"drop_rate"`
OverflowRate float64 `json:"overflow_rate"`
Uptime time.Duration `json:"uptime"`
}
StatsSummary returns a snapshot of all statistics.