nntppool

package module
v1.5.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 24, 2025 License: MIT Imports: 26 Imported by: 0

README

nntppool

A nntp pool connection with retry and provider rotation.

Features

  • Connection pooling
  • Body download retry and yenc decode
  • Post article with retry and yenc encode
  • Stat article with retry
  • TLS support
  • Multiple providers with rotation. In case of failure for article not found the provider will be rotated.
  • Backup providers. If all providers fail, the backup provider will be used for download. Useful for block accounts usage.
  • Dynamic reconfiguration - Update provider settings, add/remove providers, or change connection limits without interrupting service
  • Intelligent metrics system - Comprehensive metrics with rolling windows, automatic cleanup, and memory management to prevent infinite growth

Installation

To install the nntppool package, you can use go get:

go get github.com/javi11/nntppool

Since this package uses Rapidyenc, you will need to build it with CGO enabled

Usage Example

package main

import (
    "context"
    "log"
    "os"
    "time"

    "github.com/javi11/nntppool"
)

func main() {
    // Configure the connection pool
    config := nntppool.Config{
        MinConnections: 5,
        MaxRetries:    3,
        Providers: []nntppool.UsenetProviderConfig{
            {
                Host:                          "news.example.com",
                Port:                          119,
                Username:                      "user",
                Password:                      "pass",
                MaxConnections:                10,
                MaxConnectionIdleTimeInSeconds: 300,
                TLS:                           false,
            },
            {
                Host:                          "news-backup.example.com",
                Port:                          119,
                Username:                      "user",
                Password:                      "pass",
                MaxConnections:                5,
                MaxConnectionIdleTimeInSeconds: 300,
                TLS:                           true,
                IsBackupProvider:              true,
            },
        },
    }

    // Create a new connection pool
    pool, err := nntppool.NewConnectionPool(config)
    if err != nil {
        log.Fatal(err)
    }
    defer pool.Quit()

    // Example: Download an article
    ctx := context.Background()
    msgID := "<[email protected]>"
    file, err := os.Create("article.txt")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    written, err := pool.Body(ctx, msgID, file, nil)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Downloaded %d bytes", written)

    // Example: Post an article
    article, err := os.Open("article.txt")
    if err != nil {
        log.Fatal(err)
    }
    defer article.Close()

    err = pool.Post(ctx, article)
    if err != nil {
        log.Fatal(err)
    }

    // Example: Check if an article exists
    msgNum, err := pool.Stat(ctx, msgID, []string{"alt.binaries.test"})
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Article number: %d", msgNum)
}
Best Practices
  • Monitor Progress: Always check reconfiguration status, especially in production
  • Gradual Changes: Make incremental changes rather than large configuration overhauls
  • Error Handling: Handle reconfiguration errors gracefully and consider rollback strategies
  • Testing: Test configuration changes in development before applying to production

Metrics System

The connection pool includes a comprehensive metrics system that provides detailed insights into connection pool performance while intelligently managing memory usage to prevent infinite growth over time.

Key Features
  • Rolling Time Windows: Metrics are organized into configurable time windows (default: 1 hour)
  • Automatic Cleanup: Old metrics are automatically cleaned up based on retention policies
  • Memory Management: Built-in memory monitoring with configurable thresholds and automatic cleanup
  • Connection Tracking: Automatic detection and cleanup of stale connections
  • Data Compression: Historical data is compressed into summaries for long-term storage
  • Real-time Monitoring: Live metrics for active connections and pool performance
Getting Metrics
// Get comprehensive metrics snapshot
snapshot := pool.GetMetrics()

fmt.Printf("Active connections: %d\n", snapshot.ActiveConnections)
fmt.Printf("Total bytes downloaded: %d\n", snapshot.TotalBytesDownloaded)
fmt.Printf("Download speed: %.2f bytes/sec\n", snapshot.DownloadSpeed)
fmt.Printf("Error rate: %.2f%%\n", snapshot.ErrorRate)
fmt.Printf("Memory usage: %d bytes\n", snapshot.CurrentMemoryUsage)

// Check daily and weekly summaries
if snapshot.DailySummary != nil {
    fmt.Printf("Daily summary: %d connections created\n", snapshot.DailySummary.TotalConnectionsCreated)
}

if snapshot.WeeklySummary != nil {
    fmt.Printf("Weekly average: %.2f connections/hour\n", snapshot.WeeklySummary.AverageConnectionsPerHour)
}
Configuring Metrics Retention
// Configure metrics retention policy
config := nntppool.MetricRetentionConfig{
    DetailedRetentionDuration: 48 * time.Hour,        // Keep detailed metrics for 2 days
    RotationInterval:          30 * time.Minute,      // Create new windows every 30 minutes  
    MaxHistoricalWindows:      96,                    // Keep 96 windows (2 days of 30-min windows)
    MemoryThresholdBytes:      50 * 1024 * 1024,      // Trigger cleanup at 50MB
    AutoCleanupEnabled:        true,                  // Enable automatic cleanup
}

// Apply the configuration
metrics := pool.GetMetricsInstance() // You'll need to expose this method
metrics.SetRetentionConfig(config)
Manual Maintenance
// Perform manual cleanup and rotation check
metrics.PerformRotationCheck()

// Force connection cleanup
staleCount := metrics.ForceConnectionCleanup()
fmt.Printf("Cleaned up %d stale connections\n", staleCount)

// Get system status
status := metrics.GetRollingMetricsStatus()
fmt.Printf("Current window: %v to %v\n", status.CurrentWindowStartTime, status.CurrentWindowEndTime)
fmt.Printf("Historical windows: %d/%d\n", status.HistoricalWindowCount, status.MaxHistoricalWindows)

memory := metrics.GetMemoryUsage()
fmt.Printf("Memory: %d/%d bytes (%.1f%%)\n", 
    memory.AllocatedBytes, 
    memory.ThresholdBytes,
    float64(memory.AllocatedBytes)/float64(memory.ThresholdBytes)*100)
Metrics Available

The system tracks comprehensive metrics including:

Connection Metrics:

  • Total connections created/destroyed
  • Active connection count
  • Connection acquire/release operations
  • Connection age and lifecycle

Performance Metrics:

  • Download/upload speeds (recent and historical)
  • Command success rates
  • Error rates and retry counts
  • Acquire wait times

Traffic Metrics:

  • Bytes downloaded/uploaded
  • Articles retrieved/posted
  • Command counts and errors

System Metrics:

  • Memory usage and thresholds
  • Rolling window status
  • Connection cleanup statistics
  • Provider-specific metrics
Automatic Cleanup Behavior

The metrics system automatically:

  1. Rotates windows when time periods expire (e.g., every hour)
  2. Monitors memory usage every 5 minutes by default
  3. Cleans up stale connections every 30 seconds
  4. Compresses old data when retention periods are exceeded
  5. Triggers aggressive cleanup when memory thresholds are reached

This ensures that long-running applications maintain stable memory usage while preserving useful historical data for analysis and monitoring.

Development Setup

To set up the project for development, follow these steps:

  1. Clone the repository:
git clone https://github.com/javi11/nntppool.git
cd nntppool
  1. Install dependencies:
go mod download
  1. Run tests:
make test
  1. Lint the code:
make lint
  1. Generate mocks and other code:
make generate

Contributing

Contributions are welcome! Please open an issue or submit a pull request. See the CONTRIBUTING.md file for details.

License

This project is licensed under the MIT License. See the LICENSE file for details.

Documentation

Overview

Package nntppool is a generated GoMock package.

Package nntppool provides a connection pooling mechanism for NNTP connections.

Package nntppool is a generated GoMock package.

Index

Constants

View Source
const (
	SegmentAlreadyExistsErrCode = 441
	ToManyConnectionsErrCode    = 502
	ArticleNotFoundErrCode      = 430
	CanNotJoinGroup             = 411
	AuthenticationRequiredCode  = 401
	AuthenticationFailedCode    = 403
	InvalidUsernamePasswordCode = 480
)

Variables

View Source
var (
	ErrCapabilitiesUnpopulated    = errors.New("capabilities unpopulated")
	ErrNoSuchCapability           = errors.New("no such capability")
	ErrNilNntpConn                = errors.New("nil nntp connection")
	ErrNoProviderAvailable        = errors.New("no provider available, because possible max connections reached")
	ErrArticleNotFoundInProviders = errors.New("the article is not found in any of your providers")
	ErrFailedToPostInAllProviders = errors.New("failed to post in all providers")
)

Functions

func ExampleMetricsUsage added in v1.1.0

func ExampleMetricsUsage()

ExampleMetricsUsage demonstrates how to use the pool metrics system

func HealthCheckExample added in v1.1.0

func HealthCheckExample() bool

HealthCheckExample shows how to implement health checks using metrics

func MonitoringExample added in v1.1.0

func MonitoringExample()

MonitoringExample shows how to use metrics for continuous monitoring

func PerformanceTrackingExample added in v1.1.0

func PerformanceTrackingExample()

PerformanceTrackingExample demonstrates tracking operation performance

func TestProviderConnectivity added in v1.2.0

func TestProviderConnectivity(ctx context.Context, config UsenetProviderConfig, logger Logger, client nntpcli.Client) error

TestProviderConnectivity tests connectivity to a provider without requiring a pool This is a standalone utility function that can be used independently If client is nil, a default NNTP client will be created

Types

type ActiveConnectionMetrics added in v1.1.1

type ActiveConnectionMetrics struct {
	Count                int           `json:"count"`
	TotalBytesDownloaded int64         `json:"total_bytes_downloaded"`
	TotalBytesUploaded   int64         `json:"total_bytes_uploaded"`
	TotalCommands        int64         `json:"total_commands"`
	TotalCommandErrors   int64         `json:"total_command_errors"`
	SuccessRate          float64       `json:"success_rate_percent"`
	AverageConnectionAge time.Duration `json:"average_connection_age"`
}

ActiveConnectionMetrics represents metrics for active connections only

type AggregatedMetrics added in v1.1.2

type AggregatedMetrics struct {
	TotalBytesDownloaded   int64
	TotalBytesUploaded     int64
	TotalCommands          int64
	TotalCommandErrors     int64
	TotalArticlesRetrieved int64
	TotalArticlesPosted    int64
	SuccessRate            float64
	AverageConnectionAge   time.Duration
}

type Config

type Config struct {
	Logger              Logger
	NntpCli             nntpcli.Client
	Providers           []UsenetProviderConfig
	HealthCheckInterval time.Duration
	MinConnections      int
	MaxRetries          uint
	DelayType           DelayType
	// Deprecated: now is always false
	SkipProvidersVerificationOnCreation bool
	RetryDelay                          time.Duration
	ShutdownTimeout                     time.Duration
	DefaultConnectionLease              time.Duration
	ProviderReconnectInterval           time.Duration
	ProviderMaxReconnectInterval        time.Duration
	ProviderHealthCheckStagger          time.Duration
	ProviderHealthCheckTimeout          time.Duration
	// contains filtered or unexported fields
}

func (*Config) GetProviders added in v1.0.0

func (c *Config) GetProviders() []config.ProviderConfig

Adapter methods for internal package interfaces

type ConnectionCleanupStats added in v1.4.0

type ConnectionCleanupStats struct {
	TrackedConnections int           `json:"tracked_connections"`
	LastCleanupTime    time.Time     `json:"last_cleanup_time"`
	CleanupInterval    time.Duration `json:"cleanup_interval"`
	ConnectionTimeout  time.Duration `json:"connection_timeout"`
	NextCleanupTime    time.Time     `json:"next_cleanup_time"`
}

ConnectionCleanupStats represents statistics about connection cleanup

type ConnectionCleanupTracker added in v1.4.0

type ConnectionCleanupTracker struct {
	// contains filtered or unexported fields
}

ConnectionCleanupTracker tracks connections for automatic cleanup

type ConnectionProviderInfo

type ConnectionProviderInfo struct {
	Host                     string        `json:"host"`
	Username                 string        `json:"username"`
	MaxConnections           int           `json:"maxConnections"`
	MaxConnectionIdleTimeout time.Duration `json:"maxConnectionIdleTimeout"`
	State                    ProviderState `json:"state"`
}

func (ConnectionProviderInfo) ID

type DelayType added in v0.3.0

type DelayType int
const (
	DelayTypeFixed DelayType = iota
	DelayTypeRandom
	DelayTypeExponential
)

type Logger

type Logger interface {
	Debug(msg string, args ...any)
	Info(msg string, args ...any)
	Warn(msg string, args ...any)
	Error(msg string, args ...any)
	DebugContext(ctx context.Context, msg string, args ...any)
	InfoContext(ctx context.Context, msg string, args ...any)
	WarnContext(ctx context.Context, msg string, args ...any)
	ErrorContext(ctx context.Context, msg string, args ...any)
}

Logger interface compatible with slog.Logger

type MemoryUsage added in v1.4.0

type MemoryUsage struct {
	AllocatedBytes      uint64 `json:"allocated_bytes"`
	TotalAllocatedBytes uint64 `json:"total_allocated_bytes"`
	SystemBytes         uint64 `json:"system_bytes"`
	GCCount             uint32 `json:"gc_count"`
	ThresholdBytes      uint64 `json:"threshold_bytes"`
	ThresholdExceeded   bool   `json:"threshold_exceeded"`
}

GetMemoryUsage returns current memory usage information

type MetricRetentionConfig added in v1.4.0

type MetricRetentionConfig struct {
	// How long to keep detailed metrics before rotation
	DetailedRetentionDuration time.Duration

	// How often to perform metric rotation
	RotationInterval time.Duration

	// Maximum number of historical windows to keep
	MaxHistoricalWindows int

	// Memory threshold (bytes) that triggers aggressive cleanup
	MemoryThresholdBytes uint64

	// Enable automatic cleanup when memory thresholds are exceeded
	AutoCleanupEnabled bool
}

MetricRetentionConfig defines how long different types of metrics are retained

type MetricSummary added in v1.4.0

type MetricSummary struct {
	StartTime time.Time `json:"start_time"`
	EndTime   time.Time `json:"end_time"`

	// Aggregated core metrics
	TotalConnectionsCreated   int64 `json:"total_connections_created"`
	TotalConnectionsDestroyed int64 `json:"total_connections_destroyed"`
	TotalAcquires             int64 `json:"total_acquires"`
	TotalReleases             int64 `json:"total_releases"`
	TotalErrors               int64 `json:"total_errors"`
	TotalRetries              int64 `json:"total_retries"`
	TotalAcquireWaitTime      int64 `json:"total_acquire_wait_time_ns"`

	// Aggregated traffic metrics
	TotalBytesDownloaded   int64 `json:"total_bytes_downloaded"`
	TotalBytesUploaded     int64 `json:"total_bytes_uploaded"`
	TotalArticlesRetrieved int64 `json:"total_articles_retrieved"`
	TotalArticlesPosted    int64 `json:"total_articles_posted"`
	TotalCommandCount      int64 `json:"total_command_count"`
	TotalCommandErrors     int64 `json:"total_command_errors"`

	// Computed metrics
	AverageConnectionsPerHour float64 `json:"average_connections_per_hour"`
	AverageErrorRate          float64 `json:"average_error_rate_percent"`
	AverageSuccessRate        float64 `json:"average_success_rate_percent"`
	AverageAcquireWaitTime    int64   `json:"average_acquire_wait_time_ns"`

	// Number of windows that were summarized
	WindowCount int `json:"window_count"`
}

MetricSummary represents aggregated metrics for a longer time period

type MetricWindow added in v1.4.0

type MetricWindow struct {
	StartTime time.Time `json:"start_time"`
	EndTime   time.Time `json:"end_time"`

	// Core metrics for this window
	ConnectionsCreated   int64 `json:"connections_created"`
	ConnectionsDestroyed int64 `json:"connections_destroyed"`
	Acquires             int64 `json:"acquires"`
	Releases             int64 `json:"releases"`
	Errors               int64 `json:"errors"`
	Retries              int64 `json:"retries"`
	AcquireWaitTime      int64 `json:"acquire_wait_time_ns"`

	// Traffic metrics
	BytesDownloaded   int64 `json:"bytes_downloaded"`
	BytesUploaded     int64 `json:"bytes_uploaded"`
	ArticlesRetrieved int64 `json:"articles_retrieved"`
	ArticlesPosted    int64 `json:"articles_posted"`
	CommandCount      int64 `json:"command_count"`
	CommandErrors     int64 `json:"command_errors"`
}

MetricWindow represents metrics for a specific time window

type MockPooledConnection

type MockPooledConnection struct {
	// contains filtered or unexported fields
}

MockPooledConnection is a mock of PooledConnection interface.

func NewMockPooledConnection

func NewMockPooledConnection(ctrl *gomock.Controller) *MockPooledConnection

NewMockPooledConnection creates a new mock instance.

func (*MockPooledConnection) Close

func (m *MockPooledConnection) Close() error

Close mocks base method.

func (*MockPooledConnection) Connection

func (m *MockPooledConnection) Connection() nntpcli.Connection

Connection mocks base method.

func (*MockPooledConnection) CreatedAt

func (m *MockPooledConnection) CreatedAt() time.Time

CreatedAt mocks base method.

func (*MockPooledConnection) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockPooledConnection) ExtendLease added in v1.0.0

func (m *MockPooledConnection) ExtendLease(duration time.Duration)

ExtendLease mocks base method.

func (*MockPooledConnection) Free

func (m *MockPooledConnection) Free() error

Free mocks base method.

func (*MockPooledConnection) IsLeaseExpired added in v1.0.0

func (m *MockPooledConnection) IsLeaseExpired() bool

IsLeaseExpired mocks base method.

func (*MockPooledConnection) IsMarkedForReplacement added in v1.0.0

func (m *MockPooledConnection) IsMarkedForReplacement() bool

IsMarkedForReplacement mocks base method.

func (*MockPooledConnection) MarkForReplacement added in v1.0.0

func (m *MockPooledConnection) MarkForReplacement()

MarkForReplacement mocks base method.

func (*MockPooledConnection) Provider

Provider mocks base method.

type MockPooledConnectionMockRecorder

type MockPooledConnectionMockRecorder struct {
	// contains filtered or unexported fields
}

MockPooledConnectionMockRecorder is the mock recorder for MockPooledConnection.

func (*MockPooledConnectionMockRecorder) Close

Close indicates an expected call of Close.

func (*MockPooledConnectionMockRecorder) Connection

func (mr *MockPooledConnectionMockRecorder) Connection() *gomock.Call

Connection indicates an expected call of Connection.

func (*MockPooledConnectionMockRecorder) CreatedAt

func (mr *MockPooledConnectionMockRecorder) CreatedAt() *gomock.Call

CreatedAt indicates an expected call of CreatedAt.

func (*MockPooledConnectionMockRecorder) ExtendLease added in v1.0.0

func (mr *MockPooledConnectionMockRecorder) ExtendLease(duration any) *gomock.Call

ExtendLease indicates an expected call of ExtendLease.

func (*MockPooledConnectionMockRecorder) Free

Free indicates an expected call of Free.

func (*MockPooledConnectionMockRecorder) IsLeaseExpired added in v1.0.0

func (mr *MockPooledConnectionMockRecorder) IsLeaseExpired() *gomock.Call

IsLeaseExpired indicates an expected call of IsLeaseExpired.

func (*MockPooledConnectionMockRecorder) IsMarkedForReplacement added in v1.0.0

func (mr *MockPooledConnectionMockRecorder) IsMarkedForReplacement() *gomock.Call

IsMarkedForReplacement indicates an expected call of IsMarkedForReplacement.

func (*MockPooledConnectionMockRecorder) MarkForReplacement added in v1.0.0

func (mr *MockPooledConnectionMockRecorder) MarkForReplacement() *gomock.Call

MarkForReplacement indicates an expected call of MarkForReplacement.

func (*MockPooledConnectionMockRecorder) Provider

Provider indicates an expected call of Provider.

type MockUsenetConnectionPool

type MockUsenetConnectionPool struct {
	// contains filtered or unexported fields
}

MockUsenetConnectionPool is a mock of UsenetConnectionPool interface.

func NewMockUsenetConnectionPool

func NewMockUsenetConnectionPool(ctrl *gomock.Controller) *MockUsenetConnectionPool

NewMockUsenetConnectionPool creates a new mock instance.

func (*MockUsenetConnectionPool) Body

func (m *MockUsenetConnectionPool) Body(ctx context.Context, msgID string, w io.Writer, nntpGroups []string) (int64, error)

Body mocks base method.

func (*MockUsenetConnectionPool) BodyReader added in v1.0.0

func (m *MockUsenetConnectionPool) BodyReader(ctx context.Context, msgID string, nntpGroups []string) (nntpcli.ArticleBodyReader, error)

BodyReader mocks base method.

func (*MockUsenetConnectionPool) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockUsenetConnectionPool) GetActiveReconfigurations added in v1.0.0

func (m *MockUsenetConnectionPool) GetActiveReconfigurations() map[string]*ReconfigurationStatus

GetActiveReconfigurations mocks base method.

func (*MockUsenetConnectionPool) GetConnection

func (m *MockUsenetConnectionPool) GetConnection(ctx context.Context, skipProviders []string, useBackupProviders bool) (PooledConnection, error)

GetConnection mocks base method.

func (*MockUsenetConnectionPool) GetMetrics added in v1.1.0

func (m *MockUsenetConnectionPool) GetMetrics() *PoolMetrics

GetMetrics mocks base method.

func (*MockUsenetConnectionPool) GetMetricsSnapshot added in v1.1.0

func (m *MockUsenetConnectionPool) GetMetricsSnapshot() PoolMetricsSnapshot

GetMetricsSnapshot mocks base method.

func (*MockUsenetConnectionPool) GetProviderStatus added in v1.2.0

func (m *MockUsenetConnectionPool) GetProviderStatus(providerID string) (*ProviderInfo, bool)

GetProviderStatus mocks base method.

func (*MockUsenetConnectionPool) GetProvidersInfo

func (m *MockUsenetConnectionPool) GetProvidersInfo() []ProviderInfo

GetProvidersInfo mocks base method.

func (*MockUsenetConnectionPool) GetReconfigurationStatus added in v1.0.0

func (m *MockUsenetConnectionPool) GetReconfigurationStatus(migrationID string) (*ReconfigurationStatus, bool)

GetReconfigurationStatus mocks base method.

func (*MockUsenetConnectionPool) Post

Post mocks base method.

func (*MockUsenetConnectionPool) Quit

func (m *MockUsenetConnectionPool) Quit()

Quit mocks base method.

func (*MockUsenetConnectionPool) Reconfigure added in v1.0.0

func (m *MockUsenetConnectionPool) Reconfigure(arg0 ...Config) error

Reconfigure mocks base method.

func (*MockUsenetConnectionPool) Stat

func (m *MockUsenetConnectionPool) Stat(ctx context.Context, msgID string, nntpGroups []string) (int, error)

Stat mocks base method.

type MockUsenetConnectionPoolMockRecorder

type MockUsenetConnectionPoolMockRecorder struct {
	// contains filtered or unexported fields
}

MockUsenetConnectionPoolMockRecorder is the mock recorder for MockUsenetConnectionPool.

func (*MockUsenetConnectionPoolMockRecorder) Body

func (mr *MockUsenetConnectionPoolMockRecorder) Body(ctx, msgID, w, nntpGroups any) *gomock.Call

Body indicates an expected call of Body.

func (*MockUsenetConnectionPoolMockRecorder) BodyReader added in v1.0.0

func (mr *MockUsenetConnectionPoolMockRecorder) BodyReader(ctx, msgID, nntpGroups any) *gomock.Call

BodyReader indicates an expected call of BodyReader.

func (*MockUsenetConnectionPoolMockRecorder) GetActiveReconfigurations added in v1.0.0

func (mr *MockUsenetConnectionPoolMockRecorder) GetActiveReconfigurations() *gomock.Call

GetActiveReconfigurations indicates an expected call of GetActiveReconfigurations.

func (*MockUsenetConnectionPoolMockRecorder) GetConnection

func (mr *MockUsenetConnectionPoolMockRecorder) GetConnection(ctx, skipProviders, useBackupProviders any) *gomock.Call

GetConnection indicates an expected call of GetConnection.

func (*MockUsenetConnectionPoolMockRecorder) GetMetrics added in v1.1.0

GetMetrics indicates an expected call of GetMetrics.

func (*MockUsenetConnectionPoolMockRecorder) GetMetricsSnapshot added in v1.1.0

func (mr *MockUsenetConnectionPoolMockRecorder) GetMetricsSnapshot() *gomock.Call

GetMetricsSnapshot indicates an expected call of GetMetricsSnapshot.

func (*MockUsenetConnectionPoolMockRecorder) GetProviderStatus added in v1.2.0

func (mr *MockUsenetConnectionPoolMockRecorder) GetProviderStatus(providerID any) *gomock.Call

GetProviderStatus indicates an expected call of GetProviderStatus.

func (*MockUsenetConnectionPoolMockRecorder) GetProvidersInfo

func (mr *MockUsenetConnectionPoolMockRecorder) GetProvidersInfo() *gomock.Call

GetProvidersInfo indicates an expected call of GetProvidersInfo.

func (*MockUsenetConnectionPoolMockRecorder) GetReconfigurationStatus added in v1.0.0

func (mr *MockUsenetConnectionPoolMockRecorder) GetReconfigurationStatus(migrationID any) *gomock.Call

GetReconfigurationStatus indicates an expected call of GetReconfigurationStatus.

func (*MockUsenetConnectionPoolMockRecorder) Post

Post indicates an expected call of Post.

func (*MockUsenetConnectionPoolMockRecorder) Quit

Quit indicates an expected call of Quit.

func (*MockUsenetConnectionPoolMockRecorder) Reconfigure added in v1.0.0

func (mr *MockUsenetConnectionPoolMockRecorder) Reconfigure(arg0 ...any) *gomock.Call

Reconfigure indicates an expected call of Reconfigure.

func (*MockUsenetConnectionPoolMockRecorder) Stat

func (mr *MockUsenetConnectionPoolMockRecorder) Stat(ctx, msgID, nntpGroups any) *gomock.Call

Stat indicates an expected call of Stat.

type Option

type Option func(*Config)

type PoolMetrics added in v1.1.0

type PoolMetrics struct {
	// contains filtered or unexported fields
}

PoolMetrics provides high-performance metrics for the entire connection pool All operations use atomic operations for thread-safety and minimal overhead

func NewPoolMetrics added in v1.1.0

func NewPoolMetrics() *PoolMetrics

NewPoolMetrics creates a new metrics instance

func (*PoolMetrics) CompressOldWindows added in v1.4.0

func (m *PoolMetrics) CompressOldWindows(createSummaries bool) []*MetricSummary

CompressOldWindows removes detailed windows older than the retention period and optionally creates summaries before deletion

func (*PoolMetrics) EnableAutoCleanup added in v1.4.0

func (m *PoolMetrics) EnableAutoCleanup(enabled bool)

EnableAutoCleanup enables or disables automatic cleanup when memory thresholds are exceeded

func (*PoolMetrics) ForceConnectionCleanup added in v1.4.0

func (m *PoolMetrics) ForceConnectionCleanup() int

ForceConnectionCleanup forces an immediate cleanup of stale connections

func (*PoolMetrics) GetActiveConnectionMetrics added in v1.1.1

func (m *PoolMetrics) GetActiveConnectionMetrics() ActiveConnectionMetrics

GetActiveConnectionMetrics returns real-time metrics for currently active connections

func (*PoolMetrics) GetAverageAcquireWaitTime added in v1.1.0

func (m *PoolMetrics) GetAverageAcquireWaitTime() time.Duration

func (*PoolMetrics) GetConnectionCleanupStats added in v1.4.0

func (m *PoolMetrics) GetConnectionCleanupStats() ConnectionCleanupStats

GetConnectionCleanupStats returns statistics about the connection cleanup system

func (*PoolMetrics) GetDailySummary added in v1.4.0

func (m *PoolMetrics) GetDailySummary() *MetricSummary

GetDailySummary returns a summary of metrics for the last 24 hours

func (*PoolMetrics) GetMemoryUsage added in v1.4.0

func (m *PoolMetrics) GetMemoryUsage() MemoryUsage

func (*PoolMetrics) GetRetentionConfig added in v1.4.0

func (m *PoolMetrics) GetRetentionConfig() MetricRetentionConfig

GetRetentionConfig returns the current metric retention configuration

func (*PoolMetrics) GetRollingMetricsStatus added in v1.4.0

func (m *PoolMetrics) GetRollingMetricsStatus() RollingMetricsStatus

func (*PoolMetrics) GetSnapshot added in v1.1.0

func (m *PoolMetrics) GetSnapshot(pools []*providerPool) PoolMetricsSnapshot

GetSnapshot returns a comprehensive snapshot of all metrics This method aggregates data from puddle pools and individual connections

func (*PoolMetrics) GetTotalAcquires added in v1.1.0

func (m *PoolMetrics) GetTotalAcquires() int64

func (*PoolMetrics) GetTotalActiveConnections added in v1.4.0

func (m *PoolMetrics) GetTotalActiveConnections() int64

GetTotalActiveConnections returns the number of actively tracked connections

func (*PoolMetrics) GetTotalConnectionsCreated added in v1.1.0

func (m *PoolMetrics) GetTotalConnectionsCreated() int64

Fast getters (single atomic load each)

func (*PoolMetrics) GetTotalConnectionsDestroyed added in v1.1.0

func (m *PoolMetrics) GetTotalConnectionsDestroyed() int64

func (*PoolMetrics) GetTotalErrors added in v1.1.0

func (m *PoolMetrics) GetTotalErrors() int64

func (*PoolMetrics) GetTotalReleases added in v1.1.0

func (m *PoolMetrics) GetTotalReleases() int64

func (*PoolMetrics) GetTotalRetries added in v1.1.0

func (m *PoolMetrics) GetTotalRetries() int64

func (*PoolMetrics) GetUptime added in v1.1.0

func (m *PoolMetrics) GetUptime() time.Duration

func (*PoolMetrics) GetWeeklySummary added in v1.4.0

func (m *PoolMetrics) GetWeeklySummary() *MetricSummary

GetWeeklySummary returns a summary of metrics for the last 7 days

func (*PoolMetrics) PerformRotationCheck added in v1.4.0

func (m *PoolMetrics) PerformRotationCheck()

PerformRotationCheck checks if metric rotation is needed and performs memory cleanup

func (*PoolMetrics) RecordAcquire added in v1.1.0

func (m *PoolMetrics) RecordAcquire()

func (*PoolMetrics) RecordAcquireWaitTime added in v1.1.0

func (m *PoolMetrics) RecordAcquireWaitTime(duration time.Duration)

Performance metrics

func (*PoolMetrics) RecordConnectionCreated added in v1.1.0

func (m *PoolMetrics) RecordConnectionCreated()

Connection lifecycle metrics

func (*PoolMetrics) RecordConnectionDestroyed added in v1.1.0

func (m *PoolMetrics) RecordConnectionDestroyed()

func (*PoolMetrics) RecordError added in v1.1.0

func (m *PoolMetrics) RecordError()

func (*PoolMetrics) RecordRelease added in v1.1.0

func (m *PoolMetrics) RecordRelease()

func (*PoolMetrics) RecordRetry added in v1.1.0

func (m *PoolMetrics) RecordRetry()

func (*PoolMetrics) RegisterActiveConnection added in v1.1.1

func (m *PoolMetrics) RegisterActiveConnection(connectionID string, conn nntpcli.Connection)

RegisterActiveConnection registers a connection as active for metrics tracking This should be called when a connection is acquired from the pool

func (*PoolMetrics) ResetCounters added in v1.4.0

func (m *PoolMetrics) ResetCounters()

ResetCounters resets all atomic counters while preserving rolling metrics This can be useful for periodic resets to prevent integer overflow

func (*PoolMetrics) SetConnectionCleanupConfig added in v1.4.0

func (m *PoolMetrics) SetConnectionCleanupConfig(cleanupInterval, connectionTimeout time.Duration)

SetConnectionCleanupConfig updates the connection cleanup configuration

func (*PoolMetrics) SetDetailedRetentionDuration added in v1.4.0

func (m *PoolMetrics) SetDetailedRetentionDuration(duration time.Duration)

SetDetailedRetentionDuration updates how long detailed metrics are kept before compression

func (*PoolMetrics) SetMaxHistoricalWindows added in v1.4.0

func (m *PoolMetrics) SetMaxHistoricalWindows(maxWindows int)

SetMaxHistoricalWindows updates the maximum number of historical windows to keep

func (*PoolMetrics) SetMemoryThreshold added in v1.4.0

func (m *PoolMetrics) SetMemoryThreshold(thresholdBytes uint64)

SetMemoryThreshold updates the memory threshold that triggers automatic cleanup

func (*PoolMetrics) SetRetentionConfig added in v1.4.0

func (m *PoolMetrics) SetRetentionConfig(config MetricRetentionConfig)

SetRetentionConfig updates the metric retention configuration

func (*PoolMetrics) SetRotationInterval added in v1.4.0

func (m *PoolMetrics) SetRotationInterval(interval time.Duration)

SetRotationInterval updates how often metrics are rotated to new time windows

func (*PoolMetrics) SetSpeedCacheDuration added in v1.1.2

func (m *PoolMetrics) SetSpeedCacheDuration(duration time.Duration)

SetSpeedCacheDuration configures how long speed calculations are cached This reduces performance impact by avoiding frequent expensive calculations

func (*PoolMetrics) SetSpeedWindowDuration added in v1.1.2

func (m *PoolMetrics) SetSpeedWindowDuration(duration time.Duration)

SetSpeedWindowDuration configures the time window used for speed calculations

func (*PoolMetrics) SummarizeHistoricalWindows added in v1.4.0

func (m *PoolMetrics) SummarizeHistoricalWindows(startTime, endTime time.Time) *MetricSummary

SummarizeHistoricalWindows creates a compressed summary of historical metric windows

func (*PoolMetrics) UnregisterActiveConnection added in v1.1.1

func (m *PoolMetrics) UnregisterActiveConnection(connectionID string)

UnregisterActiveConnection removes a connection from active tracking This should be called when a connection is released back to the pool or destroyed

func (*PoolMetrics) UpdateConnectionActivity added in v1.4.0

func (m *PoolMetrics) UpdateConnectionActivity(connectionID string)

UpdateConnectionActivity updates the last-seen time for an active connection This should be called when a connection is used to prevent it from being cleaned up as stale

type PoolMetricsSnapshot added in v1.1.0

type PoolMetricsSnapshot struct {
	// Timestamp and uptime
	Timestamp time.Time     `json:"timestamp"`
	Uptime    time.Duration `json:"uptime"`

	// Connection metrics
	TotalConnectionsCreated   int64 `json:"total_connections_created"`
	TotalConnectionsDestroyed int64 `json:"total_connections_destroyed"`
	ActiveConnections         int64 `json:"active_connections"`
	TotalAcquires             int64 `json:"total_acquires"`
	TotalReleases             int64 `json:"total_releases"`

	// Pool utilization
	AcquiredConnections int32 `json:"acquired_connections"`
	IdleConnections     int32 `json:"idle_connections"`
	TotalConnections    int32 `json:"total_connections"`

	// Traffic metrics
	TotalBytesDownloaded    int64   `json:"total_bytes_downloaded"`
	TotalBytesUploaded      int64   `json:"total_bytes_uploaded"`
	TotalArticlesRetrieved  int64   `json:"total_articles_retrieved"`
	TotalArticlesPosted     int64   `json:"total_articles_posted"`
	DownloadSpeed           float64 `json:"download_speed_bytes_per_sec"`            // Recent speed (based on time window)
	UploadSpeed             float64 `json:"upload_speed_bytes_per_sec"`              // Recent speed (based on time window)
	HistoricalDownloadSpeed float64 `json:"historical_download_speed_bytes_per_sec"` // Average since pool start
	HistoricalUploadSpeed   float64 `json:"historical_upload_speed_bytes_per_sec"`   // Average since pool start
	SpeedCalculationWindow  float64 `json:"speed_calculation_window_seconds"`        // Time window used for recent speed
	SpeedCacheDuration      float64 `json:"speed_cache_duration_seconds"`            // Cache duration for speed calculations
	SpeedCacheAge           float64 `json:"speed_cache_age_seconds"`                 // Age of current cached speed values

	// Performance metrics
	TotalCommandCount      int64         `json:"total_command_count"`
	TotalCommandErrors     int64         `json:"total_command_errors"`
	CommandSuccessRate     float64       `json:"command_success_rate_percent"`
	AverageAcquireWaitTime time.Duration `json:"average_acquire_wait_time"`

	// Error metrics
	TotalErrors  int64   `json:"total_errors"`
	TotalRetries int64   `json:"total_retries"`
	ErrorRate    float64 `json:"error_rate_percent"`

	// Rolling metrics information
	CurrentWindowStartTime  time.Time `json:"current_window_start_time"`
	CurrentWindowEndTime    time.Time `json:"current_window_end_time"`
	HistoricalWindowCount   int       `json:"historical_window_count"`
	WindowRotationInterval  float64   `json:"window_rotation_interval_seconds"`
	DetailedRetentionPeriod float64   `json:"detailed_retention_period_hours"`
	MaxHistoricalWindows    int       `json:"max_historical_windows"`

	// Memory management
	MemoryThreshold    uint64    `json:"memory_threshold_bytes"`
	CurrentMemoryUsage uint64    `json:"current_memory_usage_bytes"`
	AutoCleanupEnabled bool      `json:"auto_cleanup_enabled"`
	LastMemoryCheck    time.Time `json:"last_memory_check"`

	// Connection cleanup statistics
	ConnectionCleanupStats ConnectionCleanupStats `json:"connection_cleanup_stats"`

	// Daily and weekly summaries
	DailySummary  *MetricSummary `json:"daily_summary,omitempty"`
	WeeklySummary *MetricSummary `json:"weekly_summary,omitempty"`

	// Provider-specific metrics
	ProviderMetrics []ProviderMetricsSnapshot `json:"provider_metrics"`
}

PoolMetricsSnapshot provides a comprehensive view of pool metrics Only computed when explicitly requested to minimize overhead

type PooledConnection

type PooledConnection interface {
	Connection() nntpcli.Connection
	Close() error
	Free() error
	Provider() ConnectionProviderInfo
	CreatedAt() time.Time
	IsLeaseExpired() bool
	IsMarkedForReplacement() bool
	MarkForReplacement()
	ExtendLease(duration time.Duration)
}

PooledConnection represents a managed NNTP connection from a connection pool. It wraps the underlying NNTP connection with pool management capabilities.

type Provider

type Provider struct {
	Host                           string
	Username                       string
	Password                       string
	Port                           int
	MaxConnections                 int
	MaxConnectionIdleTimeInSeconds int
	IsBackupProvider               bool
}

func (*Provider) ID

func (p *Provider) ID() string

type ProviderChange added in v1.0.0

type ProviderChange struct {
	ID         string                `json:"id"`
	ChangeType ProviderChangeType    `json:"change_type"`
	OldConfig  *UsenetProviderConfig `json:"old_config,omitempty"`
	NewConfig  *UsenetProviderConfig `json:"new_config,omitempty"`
	Priority   int                   `json:"priority"` // Migration priority (0 = highest)
}

ProviderChange represents a change to be made to a provider during migration

type ProviderChangeType added in v1.0.0

type ProviderChangeType int

ProviderChangeType represents the type of change for a provider

const (
	ProviderChangeKeep   ProviderChangeType = iota // No change needed
	ProviderChangeUpdate                           // Settings changed, migrate connections
	ProviderChangeAdd                              // New provider, add gradually
	ProviderChangeRemove                           // Remove provider, drain connections
)

func (ProviderChangeType) String added in v1.0.0

func (pct ProviderChangeType) String() string

type ProviderConfig added in v1.0.0

type ProviderConfig interface {
	ID() string
	GetHost() string
	GetUsername() string
	GetPassword() string
	GetPort() int
	GetMaxConnections() int
	GetMaxConnectionIdleTimeInSeconds() int
	GetMaxConnectionTTLInSeconds() int
	GetTLS() bool
	GetInsecureSSL() bool
	GetIsBackupProvider() bool
	GetVerifyCapabilities() []string
}

Helper interface for internal packages

type ProviderInfo

type ProviderInfo struct {
	Host                     string        `json:"host"`
	Username                 string        `json:"username"`
	UsedConnections          int           `json:"usedConnections"`
	MaxConnections           int           `json:"maxConnections"`
	MaxConnectionIdleTimeout time.Duration `json:"maxConnectionIdleTimeout"`
	State                    ProviderState `json:"state"`
	LastConnectionAttempt    time.Time     `json:"lastConnectionAttempt"`
	LastSuccessfulConnect    time.Time     `json:"lastSuccessfulConnect"`
	FailureReason            string        `json:"failureReason"`
	RetryCount               int           `json:"retryCount"`
	NextRetryAt              time.Time     `json:"nextRetryAt"`
}

func (ProviderInfo) ID

func (p ProviderInfo) ID() string

type ProviderMetricsSnapshot added in v1.1.0

type ProviderMetricsSnapshot struct {
	ProviderID string        `json:"provider_id"`
	Host       string        `json:"host"`
	Username   string        `json:"username"`
	State      ProviderState `json:"state"`

	// Pool statistics from puddle
	MaxConnections          int32         `json:"max_connections"`
	TotalConnections        int32         `json:"total_connections"`
	AcquiredConnections     int32         `json:"acquired_connections"`
	IdleConnections         int32         `json:"idle_connections"`
	ConstructingConnections int32         `json:"constructing_connections"`
	AcquireCount            int64         `json:"acquire_count"`
	AcquireDuration         time.Duration `json:"acquire_duration"`
	EmptyAcquireCount       int64         `json:"empty_acquire_count"`
	EmptyAcquireWaitTime    time.Duration `json:"empty_acquire_wait_time"`
	CanceledAcquireCount    int64         `json:"canceled_acquire_count"`

	// Connection-level aggregated metrics
	TotalBytesDownloaded   int64         `json:"total_bytes_downloaded"`
	TotalBytesUploaded     int64         `json:"total_bytes_uploaded"`
	TotalCommands          int64         `json:"total_commands"`
	TotalCommandErrors     int64         `json:"total_command_errors"`
	SuccessRate            float64       `json:"success_rate_percent"`
	AverageConnectionAge   time.Duration `json:"average_connection_age"`
	TotalArticlesRetrieved int64         `json:"total_articles_retrieved"`
	TotalArticlesPosted    int64         `json:"total_articles_posted"`
}

ProviderMetricsSnapshot contains metrics for a specific provider

type ProviderState added in v1.0.0

type ProviderState int

ProviderState represents the lifecycle state of a provider during configuration changes

const (
	ProviderStateActive               ProviderState = iota // Normal operation
	ProviderStateDraining                                  // Accepting no new connections, existing connections finishing
	ProviderStateMigrating                                 // In process of updating configuration
	ProviderStateRemoving                                  // Being removed from pool
	ProviderStateOffline                                   // Provider is offline/unreachable
	ProviderStateReconnecting                              // Currently attempting to reconnect
	ProviderStateAuthenticationFailed                      // Authentication failed, won't retry
)

func (ProviderState) String added in v1.0.0

func (ps ProviderState) String() string

type ProviderStatus added in v1.0.0

type ProviderStatus struct {
	ProviderID     string     `json:"provider_id"`
	Status         string     `json:"status"` // "pending", "migrating", "completed", "failed"
	ConnectionsOld int        `json:"connections_old"`
	ConnectionsNew int        `json:"connections_new"`
	StartTime      time.Time  `json:"start_time"`
	CompletedAt    *time.Time `json:"completed_at,omitempty"`
	Error          string     `json:"error,omitempty"`
}

ProviderStatus tracks the migration status of a single provider

type ReconfigurationStatus added in v1.0.0

type ReconfigurationStatus struct {
	ID          string                    `json:"id"`
	StartTime   time.Time                 `json:"start_time"`
	Status      string                    `json:"status"` // "running", "completed", "failed", "rolled_back"
	Changes     []ProviderChange          `json:"changes"`
	Progress    map[string]ProviderStatus `json:"progress"` // providerID -> status
	Error       string                    `json:"error,omitempty"`
	CompletedAt *time.Time                `json:"completed_at,omitempty"`
}

ReconfigurationStatus tracks the progress of a configuration reconfiguration

type RollingMetrics added in v1.4.0

type RollingMetrics struct {
	// contains filtered or unexported fields
}

RollingMetrics manages metrics across multiple time windows

type RollingMetricsStatus added in v1.4.0

type RollingMetricsStatus struct {
	Enabled                   bool      `json:"enabled"`
	CurrentWindowActive       bool      `json:"current_window_active"`
	CurrentWindowStartTime    time.Time `json:"current_window_start_time"`
	CurrentWindowEndTime      time.Time `json:"current_window_end_time"`
	CurrentWindowDuration     float64   `json:"current_window_duration_seconds"`
	HistoricalWindowCount     int       `json:"historical_window_count"`
	MaxHistoricalWindows      int       `json:"max_historical_windows"`
	RotationInterval          float64   `json:"rotation_interval_seconds"`
	DetailedRetentionDuration float64   `json:"detailed_retention_duration_hours"`
	MemoryThreshold           uint64    `json:"memory_threshold_bytes"`
	AutoCleanupEnabled        bool      `json:"auto_cleanup_enabled"`
	LastMemoryCheck           time.Time `json:"last_memory_check"`
	MemoryCheckInterval       float64   `json:"memory_check_interval_seconds"`
}

GetRollingMetricsStatus returns the current status of the rolling metrics system

type UsenetConnectionPool

type UsenetConnectionPool interface {
	GetConnection(
		ctx context.Context,

		skipProviders []string,

		useBackupProviders bool,
	) (PooledConnection, error)
	Body(
		ctx context.Context,
		msgID string,
		w io.Writer,
		nntpGroups []string,
	) (int64, error)
	BodyReader(
		ctx context.Context,
		msgID string,
		nntpGroups []string,
	) (nntpcli.ArticleBodyReader, error)
	Post(ctx context.Context, r io.Reader) error
	Stat(ctx context.Context, msgID string, nntpGroups []string) (int, error)
	GetProvidersInfo() []ProviderInfo
	GetProviderStatus(providerID string) (*ProviderInfo, bool)
	Reconfigure(...Config) error
	GetReconfigurationStatus(migrationID string) (*ReconfigurationStatus, bool)
	GetActiveReconfigurations() map[string]*ReconfigurationStatus
	GetMetrics() *PoolMetrics
	GetMetricsSnapshot() PoolMetricsSnapshot
	Quit()
}

func NewConnectionPool

func NewConnectionPool(c ...Config) (UsenetConnectionPool, error)

type UsenetProviderConfig

type UsenetProviderConfig struct {
	Host                           string
	Username                       string
	Password                       string
	VerifyCapabilities             []string
	Port                           int
	MaxConnections                 int
	MaxConnectionIdleTimeInSeconds int
	MaxConnectionTTLInSeconds      int
	TLS                            bool
	InsecureSSL                    bool
	IsBackupProvider               bool
}

func (*UsenetProviderConfig) GetHost added in v1.0.0

func (u *UsenetProviderConfig) GetHost() string

Adapter methods for internal package interfaces

func (*UsenetProviderConfig) GetInsecureSSL added in v1.0.0

func (u *UsenetProviderConfig) GetInsecureSSL() bool

func (*UsenetProviderConfig) GetIsBackupProvider added in v1.0.0

func (u *UsenetProviderConfig) GetIsBackupProvider() bool

func (*UsenetProviderConfig) GetMaxConnectionIdleTimeInSeconds added in v1.0.0

func (u *UsenetProviderConfig) GetMaxConnectionIdleTimeInSeconds() int

func (*UsenetProviderConfig) GetMaxConnectionTTLInSeconds added in v1.0.0

func (u *UsenetProviderConfig) GetMaxConnectionTTLInSeconds() int

func (*UsenetProviderConfig) GetMaxConnections added in v1.0.0

func (u *UsenetProviderConfig) GetMaxConnections() int

func (*UsenetProviderConfig) GetPassword added in v1.0.0

func (u *UsenetProviderConfig) GetPassword() string

func (*UsenetProviderConfig) GetPort added in v1.0.0

func (u *UsenetProviderConfig) GetPort() int

func (*UsenetProviderConfig) GetTLS added in v1.0.0

func (u *UsenetProviderConfig) GetTLS() bool

func (*UsenetProviderConfig) GetUsername added in v1.0.0

func (u *UsenetProviderConfig) GetUsername() string

func (*UsenetProviderConfig) GetVerifyCapabilities added in v1.0.0

func (u *UsenetProviderConfig) GetVerifyCapabilities() []string

func (*UsenetProviderConfig) ID

func (u *UsenetProviderConfig) ID() string

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL