buffer

package
v1.0.0-alpha.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2026 License: MIT Imports: 7 Imported by: 0

README

Buffer Package

A high-performance, thread-safe buffer implementation for Go with configurable overflow policies, built-in statistics, and optional Prometheus metrics integration.

Features

  • 🚀 High Performance: Lock-free operations where possible, optimized for throughput
  • 📊 Always Observable: Statistics always enabled (never operate in the dark)
  • 📈 Prometheus Ready: Optional metrics integration for production monitoring
  • 🔄 Flexible Overflow: DropOldest, DropNewest, or Block policies
  • 🎯 Type-Safe: Full generic support for any item type
  • 🧩 Functional Options: Clean, composable configuration API
  • 🔒 Thread-Safe: Safe for concurrent producers and consumers

Installation

import "github.com/c360/semstreams/pkg/buffer"

Quick Start

Basic Usage
// Create a circular buffer with capacity 1000 (stats always enabled)
buf := buffer.NewCircularBuffer[int](1000)

// Write items
err := buf.Write(42)

// Read items
item, ok := buf.Read()

// Check buffer state
size := buf.Size()
isFull := buf.IsFull()
With Overflow Policy
// Buffer that drops oldest items when full
buf := buffer.NewCircularBuffer[string](100,
    buffer.WithOverflowPolicy[string](buffer.DropOldest),
)

// Buffer that drops newest items when full
buf := buffer.NewCircularBuffer[[]byte](100,
    buffer.WithOverflowPolicy[[]byte](buffer.DropNewest),
)

// Buffer that blocks when full
buf := buffer.NewCircularBuffer[*Event](100,
    buffer.WithOverflowPolicy[*Event](buffer.Block),
)
With Prometheus Metrics
import "github.com/c360/semstreams/pkg/metric"

// Create metrics registry
registry := metric.NewMetricsRegistry()

// Create buffer with metrics export
buf := buffer.NewCircularBuffer[[]byte](5000,
    buffer.WithOverflowPolicy[[]byte](buffer.DropOldest),
    buffer.WithMetrics[[]byte](registry, "network_buffer"),
)

// Metrics automatically exported:
// - semstreams_buffer_writes_total{component="network_buffer"}
// - semstreams_buffer_drops_total{component="network_buffer"}
// - semstreams_buffer_utilization{component="network_buffer"}
// - etc.
With Drop Callback
// Get notified when items are dropped
buf := buffer.NewCircularBuffer[*Message](1000,
    buffer.WithOverflowPolicy[*Message](buffer.DropOldest),
    buffer.WithDropCallback[*Message](func(msg *Message) {
        log.Printf("Dropped message: %s", msg.ID)
        metrics.DroppedMessages.Inc()
    }),
)

Overflow Policies

DropOldest

Removes the oldest item to make room for new items when buffer is full.

buffer.WithOverflowPolicy[T](buffer.DropOldest)

Use Case: When newest data is more important (e.g., real-time telemetry)

DropNewest

Drops incoming items when buffer is full.

buffer.WithOverflowPolicy[T](buffer.DropNewest)

Use Case: When preserving order is critical (e.g., audit logs)

Block

Write operations block until space is available.

buffer.WithOverflowPolicy[T](buffer.Block)

Use Case: When no data loss is acceptable (e.g., transaction processing)

Block Policy with Timeout
buf := buffer.NewCircularBuffer[T](100,
    buffer.WithOverflowPolicy[T](buffer.Block),
)

// Write with timeout
err := buf.WriteWithTimeout(item, 5*time.Second)

// Write with context
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err := buf.WriteWithContext(ctx, item)

Functional Options

The buffer package uses functional options for clean, composable configuration:

WithOverflowPolicy

Set the overflow behavior (default: DropOldest):

buffer.WithOverflowPolicy[T](buffer.DropOldest)
WithMetrics

Enable Prometheus metrics export:

buffer.WithMetrics[T](registry, "component_name")
WithDropCallback

Set a callback for when items are dropped:

buffer.WithDropCallback[T](func(item T) {
    // Handle dropped item
})

API Reference

Buffer Interface
type Buffer[T any] interface {
    Write(item T) error              // Add item to buffer
    Read() (T, bool)                 // Remove and return oldest item
    ReadBatch(max int) []T           // Remove up to max items
    Peek() (T, bool)                 // View oldest without removing
    Size() int                       // Current number of items
    Capacity() int                   // Maximum capacity
    IsFull() bool                    // Check if at capacity
    IsEmpty() bool                   // Check if empty
    Clear()                          // Remove all items
    Stats() *Statistics              // Buffer statistics (never nil)
    Close() error                    // Shutdown buffer
}
Statistics

Statistics are always collected (not optional) for observability:

stats := buf.Stats()

// Available metrics:
stats.Writes()            // Total write operations
stats.Reads()             // Total read operations
stats.Drops()             // Total items dropped
stats.Overflows()         // Total overflow events
stats.Throughput()        // Writes per second
stats.ReadThroughput()    // Reads per second
stats.DropRate()          // Drop percentage (0.0 to 1.0)
stats.Utilization(cap)    // Buffer usage (0.0 to 1.0)

Prometheus Metrics

When enabled via WithMetrics(), the following metrics are exported:

Metric Type Description
semstreams_buffer_writes_total Counter Total write operations
semstreams_buffer_reads_total Counter Total read operations
semstreams_buffer_peeks_total Counter Total peek operations
semstreams_buffer_overflows_total Counter Total overflow events
semstreams_buffer_drops_total Counter Total items dropped
semstreams_buffer_size Gauge Current items in buffer
semstreams_buffer_utilization Gauge Buffer usage (0-1)

All metrics include a component label for identifying different buffer instances.

Performance

Benchmark results on MacBook Pro M1:

BenchmarkWrite-12                12,358,087     97.05 ns/op
BenchmarkRead-12                 17,301,534     69.31 ns/op
BenchmarkWriteRead-12             8,407,672    142.4 ns/op
BenchmarkBatchRead-12             3,430,456    349.5 ns/op
BenchmarkConcurrentWrite-12       4,287,906    279.8 ns/op
BenchmarkConcurrentMixed-12       3,015,612    397.3 ns/op
Performance Characteristics
  • Write: ~97ns per operation
  • Read: ~69ns per operation
  • Batch Read: Efficient bulk operations
  • Metrics Overhead: ~5% when enabled, zero when disabled
  • Stats Overhead: Negligible (atomic operations)
  • Memory: Pre-allocated circular buffer, no dynamic allocations

Use Cases

High-Throughput Network Input
func setupNetworkBuffer(registry *metric.MetricsRegistry) buffer.Buffer[[]byte] {
    return buffer.NewCircularBuffer[[]byte](10000,
        buffer.WithOverflowPolicy[[]byte](buffer.DropOldest),
        buffer.WithMetrics[[]byte](registry, "udp_input"),
        buffer.WithDropCallback[[]byte](func(packet []byte) {
            log.Printf("Dropped packet: %d bytes", len(packet))
        }),
    )
}
Rate-Limited Processing
func processWithRateLimit(buf buffer.Buffer[*Task]) {
    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()
    
    for range ticker.C {
        // Process batch every 100ms
        batch := buf.ReadBatch(10)
        for _, task := range batch {
            processTask(task)
        }
    }
}
Producer-Consumer Pattern
func producer(buf buffer.Buffer[*Event]) {
    for event := range eventStream {
        if err := buf.Write(event); err != nil {
            log.Printf("Buffer full: %v", err)
        }
    }
}

func consumer(buf buffer.Buffer[*Event]) {
    for {
        if event, ok := buf.Read(); ok {
            handleEvent(event)
        } else {
            time.Sleep(10 * time.Millisecond)
        }
    }
}

Architecture

Observability: Dual Tracking Pattern

The buffer package tracks operations through two independent systems:

flowchart LR
    A[Buffer Operation] --> B[Statistics]
    A --> C[Metrics]

    B --> D[Atomic Counters]
    B --> E[Computed Values]

    C --> F[Prometheus Counters]
    C --> G[Prometheus Gauges]

    D --> H[buf.Stats API]
    E --> H

    F --> I[/metrics endpoint]
    G --> I

    style A fill:#e1f5ff
    style B fill:#d4edda
    style C fill:#fff3cd
    style H fill:#d4edda
    style I fill:#fff3cd

Why Track Twice?

Both Statistics and Metrics independently track operations, which appears redundant but serves distinct purposes:

Aspect Statistics (Always On) Metrics (Optional)
Purpose Local debugging & programmatic access Time-series monitoring & dashboards
Dependency None (atomic operations) Prometheus registry
Computed Values Throughput, drop rate, utilization Raw counters/gauges only
Access buf.Stats() API /metrics HTTP endpoint
Overhead ~50ns/op ~50ns/op (when enabled)
Use Case Tests, debugging, local monitoring Production dashboards, alerting

Performance Trade-off:

  • Dual tracking overhead: ~5% per operation when metrics enabled
  • At 100k ops/sec: 0.5-1% total overhead
  • Negligible cost for comprehensive observability

Alternative Considered: Reading Statistics from Prometheus metrics to avoid duplication.

Rejected because:

  • Creates Prometheus dependency for basic stats
  • 10x slower (reading from Prometheus vs atomic operations)
  • Breaks Statistics when metrics disabled
  • Violates separation of concerns
Data Flow Architecture
flowchart TB
    subgraph Producers
        P1[Producer 1]
        P2[Producer 2]
        P3[Producer N]
    end

    subgraph CircularBuffer["Circular Buffer (Pre-allocated Array)"]
        direction LR
        H[Head] --> B1[Item]
        B1 --> B2[Item]
        B2 --> B3[...]
        B3 --> B4[Item]
        B4 --> T[Tail]
        T -.-> H
    end

    subgraph Consumers
        C1[Consumer 1]
        C2[Consumer 2]
        C3[Consumer N]
    end

    P1 -->|Write| CircularBuffer
    P2 -->|Write| CircularBuffer
    P3 -->|Write| CircularBuffer

    CircularBuffer -->|Read| C1
    CircularBuffer -->|Read| C2
    CircularBuffer -->|Read| C3

    CircularBuffer -.->|Track| Stats[Statistics]
    CircularBuffer -.->|Export| Prom[Prometheus]

    style CircularBuffer fill:#e1f5ff
    style Stats fill:#d4edda
    style Prom fill:#fff3cd
Overflow Policy Behavior
stateDiagram-v2
    [*] --> BufferNotFull: Write
    BufferNotFull --> BufferNotFull: Write (space available)
    BufferNotFull --> BufferFull: Write (reaches capacity)

    BufferFull --> DropOldest: Policy = DropOldest
    BufferFull --> DropNewest: Policy = DropNewest
    BufferFull --> Block: Policy = Block

    DropOldest --> BufferNotFull: Remove oldest, add new
    DropNewest --> BufferFull: Drop new, keep existing
    Block --> BufferNotFull: Wait for Read

    BufferFull --> BufferNotFull: Read (space freed)
Architecture Decisions
Why Stats Are Always On

Statistics collection is mandatory because:

  • Observability is critical for understanding buffer behavior
  • Negligible overhead (atomic operations ~50ns)
  • Drop rates and overflow metrics inform capacity planning
  • Throughput metrics help identify bottlenecks
  • No external dependencies required for basic monitoring
Why Functional Options

We chose functional options over struct-based configuration because:

  • More idiomatic Go pattern
  • Composable and extensible - easy to add features
  • Clear intent with named functions
  • Capacity as required parameter makes API clearer
  • Backward compatible when adding new options
Why Circular Buffer

Circular buffers provide:

  • O(1) operations for write and read
  • Predictable memory usage (pre-allocated)
  • Zero allocations during operation
  • Cache-friendly memory access patterns
  • Lock-free reads for size checks

Thread Safety

All buffer operations are thread-safe:

  • Multiple producers can write concurrently
  • Multiple consumers can read concurrently
  • Statistics use atomic operations
  • Condition variables for Block policy

Examples

UDP Packet Buffer
func setupUDPBuffer(registry *metric.MetricsRegistry) buffer.Buffer[[]byte] {
    // High-capacity buffer for UDP packets with monitoring
    return buffer.NewCircularBuffer[[]byte](5000,
        buffer.WithOverflowPolicy[[]byte](buffer.DropOldest),
        buffer.WithMetrics[[]byte](registry, "udp_receiver"),
        buffer.WithDropCallback[[]byte](func(packet []byte) {
            atomic.AddInt64(&droppedPackets, 1)
        }),
    )
}
Event Processing Pipeline
func setupEventPipeline(ctx context.Context, registry *metric.MetricsRegistry) {
    // Buffer between event source and processor
    eventBuffer := buffer.NewCircularBuffer[*Event](1000,
        buffer.WithOverflowPolicy[*Event](buffer.Block),
        buffer.WithMetrics[*Event](registry, "event_pipeline"),
    )
    
    // Producer
    go func() {
        for event := range eventSource {
            ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
            if err := eventBuffer.WriteWithContext(ctx, event); err != nil {
                log.Printf("Failed to buffer event: %v", err)
            }
            cancel()
        }
    }()
    
    // Consumer
    go func() {
        for {
            select {
            case <-ctx.Done():
                return
            default:
                if event, ok := eventBuffer.Read(); ok {
                    processEvent(event)
                }
            }
        }
    }()
}

Contributing

When implementing new buffer types:

  1. Statistics must always be initialized
  2. Follow functional options pattern
  3. Support optional Prometheus metrics
  4. Maintain thread safety
  5. Include comprehensive tests with race detection
  6. Benchmark performance characteristics

License

See LICENSE file in repository root.

Documentation

Overview

Package buffer provides generic, thread-safe buffer implementations with various overflow policies.

This package offers flexible buffer types:

  • CircularBuffer: Fixed-size buffer with configurable overflow policies
  • Support for DropOldest, DropNewest, and Block overflow policies
  • Statistics always enabled for observability
  • Optional Prometheus metrics integration via functional options

All buffer implementations are thread-safe and always collect statistics for observability. Prometheus metrics can be optionally enabled via WithMetrics() functional option.

Package buffer provides thread-safe circular buffers with configurable overflow policies, built-in statistics tracking, and optional Prometheus metrics integration.

Overview

The buffer package implements high-performance circular buffers for managing data flow between producers and consumers in concurrent systems. Buffers are generic, thread-safe, and provide comprehensive observability through always-on statistics and optional metrics.

Quick Start

Basic buffer creation:

buf, err := buffer.NewCircularBuffer[int](1000)
if err != nil {
	log.Fatal(err)
}

// Write data
err = buf.Write(42)

// Read data
value, ok := buf.Read()

With overflow policy and metrics:

buf, err := buffer.NewCircularBuffer[[]byte](5000,
	buffer.WithOverflowPolicy[[]byte](buffer.DropOldest),
	buffer.WithMetrics[[]byte](registry, "network_input"),
)

Overflow Policies

The buffer supports three overflow behaviors when capacity is reached:

  • DropOldest: Remove oldest item to make room (default)
  • DropNewest: Reject new items when full
  • Block: Write operations wait for available space

Example with blocking policy:

buf, _ := buffer.NewCircularBuffer[*Event](100,
	buffer.WithOverflowPolicy[*Event](buffer.Block),
)

// Write with timeout when using Block policy
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := buf.WriteWithContext(ctx, event)

Observability Architecture

The buffer package implements a dual-tracking pattern for comprehensive observability:

Statistics (Always On):

  • Tracks all operations using atomic counters
  • Zero configuration required
  • Available via buf.Stats()
  • Provides computed metrics (throughput, drop rate, utilization)
  • No external dependencies

Prometheus Metrics (Optional):

  • Enabled via WithMetrics() option
  • Exports to Prometheus for time-series monitoring
  • Includes component labels for instance identification
  • Standard metric types (Counter, Gauge)

Design Decision: Dual Tracking Pattern

Both Statistics and Metrics track operations independently, which appears redundant but serves distinct operational purposes:

Why Track Twice?

1. Independence: Statistics work without Prometheus dependency

  • Always available for debugging, even in minimal deployments
  • No external infrastructure required for basic observability

2. Computed Metrics: Statistics provide derived values not available in raw Prometheus

  • Throughput (ops/sec) with built-in timing
  • Drop rate as percentage (drops / writes)
  • Overflow rate as percentage (overflows / writes)
  • Utilization relative to capacity

3. Different Use Cases:

  • Statistics: Programmatic access, debugging, tests, local monitoring
  • Metrics: Time-series analysis, dashboards, alerting, production monitoring

4. Performance Trade-off:

  • Overhead: ~50-100ns per operation for dual tracking
  • At 100k ops/sec: ~0.5-1% total overhead
  • Cost is negligible compared to observability value

Alternative Considered: Metrics-Based Statistics

We considered reading Statistics from Prometheus metrics to avoid duplication:

func (s *Statistics) Writes() int64 {
	dto := &dto.Metric{}
	s.metrics.writes.Write(dto)
	return int64(dto.Counter.GetValue())
}

Rejected because:

  • Creates Prometheus dependency for basic stats
  • Reading from Prometheus is slower (~10x) than atomic operations
  • Breaks Statistics when metrics are disabled
  • Violates separation of concerns

Performance Impact

Dual tracking overhead per operation:

  • 1x atomic increment (Statistics)
  • 1x atomic increment (Prometheus counter) if enabled
  • 1x gauge set (Prometheus) if enabled

Benchmarks (M3 MacBook Pro):

  • Write with stats only: ~97ns/op
  • Write with stats + metrics: ~102ns/op (~5% overhead)
  • Read with stats only: ~69ns/op
  • Read with stats + metrics: ~72ns/op (~4% overhead)

At high throughput (100k ops/sec), dual tracking adds ~5-10ms/sec of overhead, which is acceptable for the operational visibility gained.

Thread Safety

All buffer operations are thread-safe for concurrent use:

  • Multiple producers can write concurrently
  • Multiple consumers can read concurrently
  • Statistics use atomic operations (lock-free)
  • Metrics use Prometheus atomic types
  • Internal state protected by sync.RWMutex
  • Block policy uses sync.Cond for waiting

API Design Patterns

Functional Options:

The package uses functional options for clean, composable configuration:

buf, _ := buffer.NewCircularBuffer[T](capacity,
	buffer.WithOverflowPolicy[T](policy),
	buffer.WithMetrics[T](registry, prefix),
	buffer.WithDropCallback[T](callback),
)

This pattern provides:

  • Clear intent with named functions
  • Easy composition of features
  • Backward compatibility when adding options
  • Type-safe configuration

Generic Types:

Buffers are fully generic and work with any Go type:

intBuffer := buffer.NewCircularBuffer[int](100)
byteBuffer := buffer.NewCircularBuffer[[]byte](1000)
structBuffer := buffer.NewCircularBuffer[*MyStruct](500)

Performance Characteristics

Operations:

  • Write: O(1) constant time
  • Read: O(1) constant time
  • ReadBatch: O(n) where n is batch size
  • Peek: O(1) constant time
  • Size/IsFull/IsEmpty: O(1) constant time

Memory:

  • Pre-allocated circular array
  • No dynamic allocations during operation
  • Memory usage: capacity * sizeof(T)
  • Statistics overhead: ~200 bytes
  • Metrics overhead: ~1KB when enabled

Common Use Cases

Network Packet Buffering:

udpBuffer := buffer.NewCircularBuffer[[]byte](10000,
	buffer.WithOverflowPolicy[[]byte](buffer.DropOldest),
	buffer.WithMetrics[[]byte](registry, "udp_input"),
)

Event Processing Pipeline:

eventBuffer := buffer.NewCircularBuffer[*Event](1000,
	buffer.WithOverflowPolicy[*Event](buffer.Block),
	buffer.WithMetrics[*Event](registry, "events"),
)

Rate-Limited Processing:

taskBuffer := buffer.NewCircularBuffer[*Task](500,
	buffer.WithOverflowPolicy[*Task](buffer.DropNewest),
	buffer.WithDropCallback[*Task](func(t *Task) {
		log.Printf("Dropped task: %s", t.ID)
	}),
)

Testing

The package includes comprehensive tests with race detection:

go test -race ./pkg/buffer

Benchmarks are available to validate performance:

go test -bench=. ./pkg/buffer

Examples

See buffer_test.go and examples_test.go for runnable examples that appear in godoc.

Index

Constants

View Source
const (
	// ContextKeyStats can be used to pass statistics through context.
	ContextKeyStats contextKey = "buffer-stats"
)

Variables

This section is empty.

Functions

func WithStats

func WithStats(ctx context.Context, stats *Statistics) context.Context

WithStats adds statistics to the context.

Types

type Buffer

type Buffer[T any] interface {
	// Write adds an item to the buffer. Returns an error if the operation fails.
	// Behavior depends on the overflow policy when buffer is full.
	Write(item T) error

	// Read retrieves and removes one item from the buffer.
	// Returns the item and true if successful, zero value and false if buffer is empty.
	Read() (T, bool)

	// ReadBatch retrieves and removes up to max items from the buffer.
	// Returns a slice containing the retrieved items (may be shorter than max).
	ReadBatch(max int) []T

	// Peek retrieves one item without removing it from the buffer.
	// Returns the item and true if successful, zero value and false if buffer is empty.
	Peek() (T, bool)

	// Size returns the current number of items in the buffer.
	Size() int

	// Capacity returns the maximum number of items the buffer can hold.
	Capacity() int

	// IsFull returns true if the buffer is at maximum capacity.
	IsFull() bool

	// IsEmpty returns true if the buffer contains no items.
	IsEmpty() bool

	// Clear removes all items from the buffer.
	Clear()

	// Stats returns buffer statistics (always available for observability).
	Stats() *Statistics

	// Close shuts down the buffer and releases any resources.
	Close() error
}

Buffer represents a generic buffer interface that all buffer implementations must satisfy. The buffer is parameterized by item type T for type safety.

func NewCircularBuffer

func NewCircularBuffer[T any](capacity int, options ...Option[T]) (Buffer[T], error)

NewCircularBuffer creates a new circular buffer with the specified capacity and options. Stats are ALWAYS collected for observability. Metrics are optional via WithMetrics(). Returns an error if metrics registration fails when metrics are requested. Capacity is required - all other configuration is via functional options.

type DropCallback

type DropCallback[T any] func(item T)

DropCallback is called when an item is dropped due to overflow policy. It receives the item that was dropped.

type Option

type Option[T any] func(*bufferOptions[T])

Option configures buffer behavior using the functional options pattern. This provides a clean, extensible API for configuring buffers.

func WithDropCallback

func WithDropCallback[T any](callback DropCallback[T]) Option[T]

WithDropCallback sets a callback function that is called when items are dropped. The callback receives the item that was dropped.

func WithMetrics

func WithMetrics[T any](registry *metric.MetricsRegistry, prefix string) Option[T]

WithMetrics enables Prometheus metrics export for buffer statistics. If registry is nil, this option is ignored. Registry should not be nil in normal usage - this handles edge cases gracefully.

func WithOverflowPolicy

func WithOverflowPolicy[T any](policy OverflowPolicy) Option[T]

WithOverflowPolicy sets the overflow behavior for the buffer. Defaults to DropOldest if not specified.

type OverflowPolicy

type OverflowPolicy int

OverflowPolicy defines how the buffer behaves when it reaches capacity.

const (
	// DropOldest removes the oldest item to make room for new items.
	DropOldest OverflowPolicy = iota

	// DropNewest drops new items when the buffer is full.
	DropNewest

	// Block causes Write operations to block until space is available.
	Block
)

func (OverflowPolicy) String

func (p OverflowPolicy) String() string

String returns a human-readable representation of the overflow policy.

type Statistics

type Statistics struct {
	// contains filtered or unexported fields
}

Statistics tracks buffer performance metrics.

func NewStatistics

func NewStatistics() *Statistics

NewStatistics creates a new statistics tracker.

func StatsFromContext

func StatsFromContext(ctx context.Context) (*Statistics, bool)

StatsFromContext retrieves statistics from the context.

func (*Statistics) CurrentSize

func (s *Statistics) CurrentSize() int64

CurrentSize returns the current number of items in the buffer.

func (*Statistics) Drop

func (s *Statistics) Drop()

Drop records an item drop due to overflow policy.

func (*Statistics) DropRate

func (s *Statistics) DropRate() float64

DropRate returns the percentage of writes that resulted in drops (0.0 to 1.0).

func (*Statistics) Drops

func (s *Statistics) Drops() int64

Drops returns the total number of dropped items.

func (*Statistics) MaxSize

func (s *Statistics) MaxSize() int64

MaxSize returns the maximum number of items the buffer has held.

func (*Statistics) MemoryUsage

func (s *Statistics) MemoryUsage() int64

MemoryUsage returns the estimated memory usage in bytes.

func (*Statistics) Overflow

func (s *Statistics) Overflow()

Overflow records a buffer overflow event.

func (*Statistics) OverflowRate

func (s *Statistics) OverflowRate() float64

OverflowRate returns the percentage of writes that caused overflows (0.0 to 1.0).

func (*Statistics) Overflows

func (s *Statistics) Overflows() int64

Overflows returns the total number of overflow events.

func (*Statistics) Peek

func (s *Statistics) Peek()

Peek records a buffer peek operation.

func (*Statistics) Peeks

func (s *Statistics) Peeks() int64

Peeks returns the total number of peek operations.

func (*Statistics) Read

func (s *Statistics) Read()

Read records a buffer read operation.

func (*Statistics) ReadThroughput

func (s *Statistics) ReadThroughput() float64

ReadThroughput returns the average number of reads per second.

func (*Statistics) Reads

func (s *Statistics) Reads() int64

Reads returns the total number of read operations.

func (*Statistics) Reset

func (s *Statistics) Reset()

Reset resets all statistics to zero.

func (*Statistics) Summary

func (s *Statistics) Summary() StatsSummary

Summary returns a snapshot of all statistics.

func (*Statistics) Throughput

func (s *Statistics) Throughput() float64

Throughput returns the average number of writes per second.

func (*Statistics) UpdateMemoryUsage

func (s *Statistics) UpdateMemoryUsage(usage int64)

UpdateMemoryUsage updates the estimated memory usage.

func (*Statistics) UpdateSize

func (s *Statistics) UpdateSize(size int64)

UpdateSize updates the current buffer size.

func (*Statistics) Uptime

func (s *Statistics) Uptime() time.Duration

Uptime returns how long the buffer has been running.

func (*Statistics) Utilization

func (s *Statistics) Utilization(capacity int64) float64

Utilization returns the current buffer utilization as a percentage (0.0 to 1.0).

func (*Statistics) Write

func (s *Statistics) Write()

Write records a buffer write operation.

func (*Statistics) Writes

func (s *Statistics) Writes() int64

Writes returns the total number of write operations.

type StatsSummary

type StatsSummary struct {
	Writes         int64         `json:"writes"`
	Reads          int64         `json:"reads"`
	Peeks          int64         `json:"peeks"`
	Overflows      int64         `json:"overflows"`
	Drops          int64         `json:"drops"`
	CurrentSize    int64         `json:"current_size"`
	MaxSize        int64         `json:"max_size"`
	MemoryUsage    int64         `json:"memory_usage"`
	Throughput     float64       `json:"throughput"`
	ReadThroughput float64       `json:"read_throughput"`
	DropRate       float64       `json:"drop_rate"`
	OverflowRate   float64       `json:"overflow_rate"`
	Uptime         time.Duration `json:"uptime"`
}

StatsSummary returns a snapshot of all statistics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL