Documentation
¶
Overview ¶
Package subscription provides guaranteed event delivery for event-sourced systems.
Delivery Guarantee ¶
The core invariant: if an event is written to the store, every registered consumer WILL process it. This is achieved through:
- Durable checkpoints track each consumer's position
- Catch-up on startup replays from checkpoint to head
- Periodic polling ensures no event is missed (belt and suspenders)
- Optional notifiers provide low-latency delivery
The Sequence Gap Problem ¶
When events are stored with auto-increment global sequences, concurrent writers can cause gaps: transaction A gets sequence 10, transaction B gets sequence 11, B commits first. A consumer checkpointing at 11 could miss 10.
Four approaches, in order of recommendation:
## 1. Single Writer (Recommended)
Use the CommandBus for single-writer-per-stream. One goroutine per stream means global sequences are always monotonic with no holes. No gaps, no complexity, fastest possible throughput.
This is the right answer for most applications. The other approaches exist as escape hatches for systems that can't use single writer.
## 2. Published Sequence Table
After committing events, update a separate "published_sequence" table with the highest committed sequence. Consumers read from this table instead of trusting the events table directly.
Trade-offs:
- Extra write per commit (small overhead)
- Reads are trivial — no gap detection needed
- Works with any relational database
- Slight delivery delay (published after commit)
## 3. Postgres Transaction Visibility (pg_snapshot)
Postgres exposes transaction visibility via system columns:
SELECT * FROM events WHERE global_seq > $checkpoint AND xmin::text::bigint < pg_snapshot_xmin(pg_current_snapshot())::text::bigint ORDER BY global_seq;
This reads only guaranteed-committed rows. Mathematically correct — no heuristics, no timeouts.
Trade-offs:
- Postgres-specific (not portable)
- Unknown performance impact — the xmin/snapshot functions may be expensive under high write load. Benchmark for your workload.
- Requires understanding Postgres MVCC internals
## 4. Gap Detection with Timeout (Generic Fallback)
The subscription detects sequence gaps and waits for them to fill:
- Gap detected → wait GapTimeout (default 500ms)
- Gap fills within timeout → process normally
- Gap persists → assume rollback, skip it, log warning
- Known gaps tracked to avoid re-waiting
Trade-offs:
- Works with any store (generic)
- Adds latency on gaps (up to GapTimeout)
- Heuristic — could theoretically skip a very slow transaction (mitigated by generous timeout)
- Simple to understand and debug
This is the built-in fallback used by EventSubscription when gap detection is enabled (GapTimeout > 0).
Notifiers ¶
StoreNotifier is an optimization for low-latency delivery. Without it, subscriptions rely on periodic polling (default 500ms). With it, events are delivered within milliseconds.
Available notifiers:
- ChannelNotifier: in-process, for testing and memory stores
- (Future) PostgresNotifier: LISTEN/NOTIFY
- (Future) SQLiteNotifier: data_version polling
- (Future) NATSNotifier: subject pub/sub
Lag Monitoring ¶
Subscriptions support opt-in lag monitoring via LagMonitorConfig. When configured, a background goroutine periodically compares the consumer's checkpoint position against the store head and fires a callback when lag exceeds a configurable threshold.
Use WithLagMonitor to enable via options, or set Config.LagMonitor directly:
cfg.LagMonitor = &subscription.LagMonitorConfig{
Callback: func(name string, lag, pos, latest uint64) {
slog.Warn("subscription lagging", "consumer", name, "lag", lag)
},
Threshold: 1000, // alert when >1000 events behind
CheckInterval: 10 * time.Second,
}
The monitor runs for the lifetime of the subscription and stops when the subscription's context is cancelled.
Package subscription provides guaranteed event delivery with checkpoint-based catch-up and live notification support.
The core guarantee: if an event is written to the store, every registered consumer WILL process it. This works through:
- Checkpoint: each consumer tracks its last processed global sequence
- Catch-up: on start, replay from checkpoint to head
- Live: after caught up, use notifications + periodic polling
- Retry: failed events retry with backoff, then go to DLQ
Single-writer (via CommandBus) eliminates sequence gaps. For multi-writer scenarios, gap detection handles out-of-order commits. See GapDetector.
Index ¶
- func Add[E any](m *SubscriptionManager, sub *EventSubscription[E]) error
- func WithCheckpointInfo(ctx context.Context, info CheckpointInfo) context.Context
- type BatchHandler
- type ChannelNotifier
- type Checkpoint
- type CheckpointInfo
- type Config
- type EventSubscription
- type GapAction
- type GlobalEvent
- type GlobalReader
- type Handler
- type HealthHandler
- type HealthOption
- type LagCallback
- type LagMonitor
- type LagMonitorConfig
- type LagTracker
- type MemoryCheckpoint
- type MemoryGlobalReader
- func (r *MemoryGlobalReader[E]) Append(streamID string, data ...E) []uint64
- func (r *MemoryGlobalReader[E]) AppendTyped(streamID string, events ...) []uint64
- func (r *MemoryGlobalReader[E]) AppendWithSequence(seq uint64, streamID string, data E)
- func (r *MemoryGlobalReader[E]) LatestSequence(_ context.Context) (uint64, error)
- func (r *MemoryGlobalReader[E]) ReadFrom(_ context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error)
- type Option
- func WithCheckpointEvery[E any](n int) Option[E]
- func WithCheckpointFlushInterval[E any](d time.Duration) Option[E]
- func WithLagMonitor[E any](callback LagCallback, threshold uint64, checkInterval time.Duration) Option[E]
- func WithLeaderLock[E any](lock eskit.LockRegistry) Option[E]
- func WithNotifier[E any](n StoreNotifier) Option[E]
- type SequenceChecker
- type StoreAdapter
- type StoreNotifier
- type StoreReader
- type SubscriptionManager
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Add ¶
func Add[E any](m *SubscriptionManager, sub *EventSubscription[E]) error
Add registers and starts a subscription. Returns error if ID is duplicate.
func WithCheckpointInfo ¶
func WithCheckpointInfo(ctx context.Context, info CheckpointInfo) context.Context
WithCheckpointInfo attaches checkpoint info to the context. Exported for use in integration tests; production code uses this internally.
Types ¶
type BatchHandler ¶
type BatchHandler[E any] func(ctx context.Context, events []GlobalEvent[E]) error
BatchHandler processes a batch of events. Checkpoint advances only after the entire batch succeeds.
type ChannelNotifier ¶
type ChannelNotifier struct {
// contains filtered or unexported fields
}
ChannelNotifier is an in-process StoreNotifier backed by a Go channel. Use for in-memory stores and testing. Call Signal() after appending events.
func NewChannelNotifier ¶
func NewChannelNotifier() *ChannelNotifier
NewChannelNotifier creates a new in-process notifier.
func (*ChannelNotifier) Close ¶
func (n *ChannelNotifier) Close() error
Close shuts down the notifier and closes all listener channels.
func (*ChannelNotifier) Notify ¶
func (n *ChannelNotifier) Notify(ctx context.Context) <-chan uint64
Notify returns a channel that receives signals when new events are appended. The channel is closed when the notifier is closed or ctx is cancelled.
func (*ChannelNotifier) Signal ¶
func (n *ChannelNotifier) Signal(sequence uint64)
Signal notifies all listeners that new events are available at the given sequence. Non-blocking: if a listener's buffer is full, the signal is dropped (polling catches it).
type Checkpoint ¶
type Checkpoint interface {
// Load returns the last processed sequence for a consumer. Returns 0 if new.
Load(ctx context.Context, consumerID string) (uint64, error)
// Save persists the consumer's position. Must be atomic.
Save(ctx context.Context, consumerID string, sequence uint64) error
}
Checkpoint tracks a consumer's last processed global sequence. Implementations must be durable for production use (SQLite, Postgres, etc).
type CheckpointInfo ¶
type CheckpointInfo struct {
// ConsumerID is the subscription's consumer identifier.
ConsumerID string
// Sequence is the global sequence number of the event being processed.
Sequence uint64
}
CheckpointInfo carries checkpoint metadata through context, allowing transactional handlers (e.g., pgview, sqlview) to save the checkpoint within the same database transaction as the projection update.
This enables atomic Evolve + Checkpoint, preventing double processing of non-idempotent side effects on crash recovery.
func CheckpointFromContext ¶
func CheckpointFromContext(ctx context.Context) (CheckpointInfo, bool)
CheckpointFromContext extracts checkpoint info from the context. Returns the info and true if present, or zero value and false otherwise.
Used by transactional projection packages (pgview, sqlview) to save the checkpoint within the same transaction as Evolve:
if info, ok := subscription.CheckpointFromContext(ctx); ok {
tx.Exec(ctx, "INSERT INTO checkpoints ...", info.ConsumerID, info.Sequence)
}
type Config ¶
type Config[E any] struct { // ConsumerID uniquely identifies this consumer. Required. ConsumerID string // Reader provides access to the global event stream. Required. Reader GlobalReader[E] // Checkpoint tracks consumed position. Required. Checkpoint Checkpoint // Handler processes individual events. Either Handler or BatchHandler required. Handler Handler[E] // BatchHandler processes events in batches. Mutually exclusive with Handler. BatchHandler BatchHandler[E] // Notifier provides low-latency event notifications. Optional. // Without it, the subscription relies on polling only. Notifier StoreNotifier // PollInterval is the fallback polling interval. Default: 500ms. // Belt and suspenders — ensures delivery even if notifier drops signals. PollInterval time.Duration // BatchSize is the max events to read per poll. Default: 100. BatchSize int // MaxRetries before sending to DLQ. Default: 5. MaxRetries int // RetryBaseDelay is the base delay for exponential backoff. Default: 100ms. RetryBaseDelay time.Duration // RetryMaxDelay caps the backoff. Default: 10s. RetryMaxDelay time.Duration // OnDLQ is called when an event exhausts retries. Optional. // If nil, the error is logged and the event is skipped. OnDLQ func(ctx context.Context, event GlobalEvent[E], err error) // Logger for operational logging. Optional. Logger *slog.Logger // GapTimeout is how long to wait for a sequence gap to fill before // assuming it was a rollback. Default: 500ms. // Set to 0 to disable gap detection (single-writer mode). GapTimeout time.Duration // MaxGapWait is the total time willing to wait for gaps before moving on. // Default: 5s. MaxGapWait time.Duration // OnGapTimeout controls what happens when a gap times out (GapTimeout elapsed). // Default (GapActionError): stop processing at the gap, retry on next poll. // GapActionSkip: skip the gap immediately (old behavior, risks missing events). OnGapTimeout GapAction // SequenceChecker verifies whether a specific global sequence exists in the // store. Used as a self-healing check before skipping a gap after MaxGapWait. // If nil, gaps are skipped without verification after MaxGapWait. SequenceChecker SequenceChecker // EventTypes filters events by their EventType field. Only events whose // EventType is in this list will be delivered to the handler. // If empty or nil, all events are delivered (no filtering). // Checkpoint still advances past filtered-out events to ensure progress. EventTypes []string // CheckpointEvery saves the checkpoint every N events instead of every event. // Default: 1 (save after every event — current behavior). // Set to 100 to save every 100th event. On crash, at most N-1 events // may be replayed (idempotent handlers recommended for N > 1). CheckpointEvery int // CheckpointMaxAge is the maximum time between checkpoint saves. // When set, a checkpoint is saved when either CheckpointEvery OR // CheckpointMaxAge is reached, whichever comes first. // Default: 0 (disabled — only count-based batching). CheckpointMaxAge time.Duration // AtomicCheckpoint indicates that the Handler saves the checkpoint within // its own transaction (e.g., via CheckpointFromContext). When true, the // subscription injects CheckpointInfo into the context and skips the // external Checkpoint.Save call after each event. // // This prevents double processing on crash recovery for non-idempotent // side effects. See pgview and sqlview CheckpointInTx option. AtomicCheckpoint bool // LeaderLock enables leader election for this subscription. When set, // Start() acquires the lock before processing events — only the instance // that holds the lock will consume events. Other instances block in // Acquire until the leader releases (crash, context cancel, Stop). // The lock is acquired once at startup with zero per-event overhead. // Nil means no leader election — all instances process independently. LeaderLock eskit.LockRegistry // LagMonitor configures optional lag monitoring. When set, a background // goroutine periodically checks how far behind the subscription is and // fires the callback when lag exceeds the threshold. Nil means no monitoring. LagMonitor *LagMonitorConfig }
Config configures an EventSubscription.
func FromStateView ¶
func FromStateView[E any](view eskit.StateView[E], reader GlobalReader[E], checkpoint Checkpoint, opts ...Option[E]) Config[E]
FromStateView creates a subscription Config from a StateView. This is the standard way to wire a projection into the event stream.
The StateView's Evolve function receives eskit.Event[E], which is adapted from the subscription's GlobalEvent[E]. If the StateView has a Setup function, it is called before the subscription starts processing events.
type EventSubscription ¶
type EventSubscription[E any] struct { // contains filtered or unexported fields }
EventSubscription is a durable event consumer with guaranteed delivery. It catches up from its checkpoint on start, then switches to live mode using notifier signals (if available) and periodic polling.
func New ¶
func New[E any](cfg Config[E]) (*EventSubscription[E], error)
New creates a new EventSubscription. Call Start to begin processing.
func (*EventSubscription[E]) Lag ¶
func (s *EventSubscription[E]) Lag(ctx context.Context) (uint64, error)
Lag returns how many events behind the consumer is (approximately).
func (*EventSubscription[E]) Start ¶
func (s *EventSubscription[E]) Start(ctx context.Context) error
Start begins processing events. Blocks until ctx is cancelled or Stop is called. Typically run in a goroutine.
When LeaderLock is configured, Start blocks on lock acquisition — only the instance that holds the lock processes events. The lock is held for the lifetime of the subscription.
func (*EventSubscription[E]) Stop ¶
func (s *EventSubscription[E]) Stop()
Stop signals the subscription to shut down gracefully.
func (*EventSubscription[E]) Wait ¶
func (s *EventSubscription[E]) Wait()
Wait blocks until the subscription has fully stopped. If Start has not been called yet, Wait returns immediately.
type GapAction ¶
type GapAction int
GapAction determines what happens when a sequence gap times out.
const ( // GapActionError (default) stops processing at the gap and retries on the // next poll. The gap will eventually resolve when the slow transaction // commits or Postgres reclaims the rolled-back sequence. GapActionError GapAction = iota // GapActionSkip skips timed-out gaps immediately (old behavior). // Use only if you explicitly accept the risk of missing events from // slow transactions. GapActionSkip )
type GlobalEvent ¶
type GlobalEvent[E any] struct { // GlobalSequence is the store-wide monotonic position. Never reused. GlobalSequence uint64 // StreamType identifies the type/category of stream (e.g., "order", "account"). StreamType string // StreamID identifies which stream this event belongs to. StreamID string // EventType is the string name of the event (e.g., "OrderCreated"). // Available when the store has an EventRegistry configured. EventType string // Version is the per-stream version number. Version int // Data is the domain event payload. Data E // Metadata holds correlation, causation, and principal information. Metadata eskit.Metadata // Timestamp is when the event was recorded. Timestamp time.Time }
GlobalEvent wraps a domain event with its global sequence number. The global sequence is a monotonically increasing number assigned at append time, used by consumers to track position and detect gaps.
type GlobalReader ¶
type GlobalReader[E any] interface { // ReadFrom returns events starting from the given global sequence (inclusive), // up to limit events. Returns events in global sequence order. ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error) // LatestSequence returns the highest global sequence in the store, or 0 if empty. LatestSequence(ctx context.Context) (uint64, error) }
GlobalReader reads events by global sequence. Event stores must implement this to support subscriptions.
type Handler ¶
type Handler[E any] func(ctx context.Context, event GlobalEvent[E]) error
Handler processes events. Return nil to advance the checkpoint. Return an error to trigger retry logic.
type HealthHandler ¶
type HealthHandler struct {
// contains filtered or unexported fields
}
HealthHandler is an http.Handler that reports the health of registered projections based on their event lag. It returns HTTP 200 when all projections are within the configured lag threshold, and HTTP 503 when any projection is degraded. This is suitable for use as a load balancer health check endpoint.
func NewHealthHandler ¶
func NewHealthHandler(opts ...HealthOption) *HealthHandler
NewHealthHandler creates a new HealthHandler with the given options.
func (*HealthHandler) ServeHTTP ¶
func (h *HealthHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request)
ServeHTTP writes a JSON health report. It returns HTTP 200 when all projections are healthy and HTTP 503 when any projection is degraded.
type HealthOption ¶
type HealthOption func(*HealthHandler)
HealthOption configures a HealthHandler.
func WithLagThreshold ¶
func WithLagThreshold(d time.Duration) HealthOption
WithLagThreshold sets the maximum acceptable lag duration. If any projection's last update is older than this threshold, the handler reports degraded status. The default threshold is 30 seconds.
func WithProjection ¶
func WithProjection(name string, monitor LagMonitor) HealthOption
WithProjection registers a named projection's LagMonitor with the health handler.
type LagCallback ¶
LagCallback is called periodically with the subscription's current lag. Parameters: consumer name, current lag, checkpoint position, latest store sequence. The callback fires on every check interval, regardless of lag value — this allows consumers like readiness monitors to track that a subscription is alive and caught up.
type LagMonitor ¶
type LagMonitor interface {
// Lag returns the current event lag (number of events behind head)
// and the time the projection last processed an event.
Lag() (lag uint64, lastProcessed time.Time)
}
LagMonitor reports the current lag status of a projection. Implementations must be safe for concurrent use.
type LagMonitorConfig ¶
type LagMonitorConfig struct {
// Callback is invoked periodically with current lag information.
// Always called on each check interval, even when lag is zero.
Callback LagCallback
// Threshold is unused (kept for backward compatibility). The callback
// always fires regardless of lag value.
Threshold uint64
// CheckInterval is how often the monitor checks the lag. Default: 10s.
CheckInterval time.Duration
}
LagMonitorConfig configures lag monitoring for a subscription.
type LagTracker ¶
type LagTracker struct {
// contains filtered or unexported fields
}
LagTracker tracks projection lag by recording updates from a LagCallback. Use LagTracker.Callback as the LagCallback in LagMonitorConfig and then pass the tracker as a LagMonitor to NewHealthHandler.
func (*LagTracker) Callback ¶
func (t *LagTracker) Callback() LagCallback
Callback returns a LagCallback that updates this tracker.
type MemoryCheckpoint ¶
type MemoryCheckpoint struct {
// contains filtered or unexported fields
}
MemoryCheckpoint is an in-memory checkpoint for testing.
func NewMemoryCheckpoint ¶
func NewMemoryCheckpoint() *MemoryCheckpoint
type MemoryGlobalReader ¶
type MemoryGlobalReader[E any] struct { // contains filtered or unexported fields }
MemoryGlobalReader is an in-memory GlobalReader for testing. Events are appended manually and assigned monotonic global sequences.
func NewMemoryGlobalReader ¶
func NewMemoryGlobalReader[E any]() *MemoryGlobalReader[E]
NewMemoryGlobalReader creates a new in-memory global reader.
func (*MemoryGlobalReader[E]) Append ¶
func (r *MemoryGlobalReader[E]) Append(streamID string, data ...E) []uint64
Append adds events and assigns global sequences. Returns the assigned sequences.
func (*MemoryGlobalReader[E]) AppendTyped ¶
func (r *MemoryGlobalReader[E]) AppendTyped(streamID string, events ...struct { Data E EventType string }) []uint64
AppendTyped adds events with explicit event types and assigns global sequences.
func (*MemoryGlobalReader[E]) AppendWithSequence ¶
func (r *MemoryGlobalReader[E]) AppendWithSequence(seq uint64, streamID string, data E)
AppendWithSequence adds an event with a specific sequence number. Used for testing gap scenarios.
func (*MemoryGlobalReader[E]) LatestSequence ¶
func (r *MemoryGlobalReader[E]) LatestSequence(_ context.Context) (uint64, error)
func (*MemoryGlobalReader[E]) ReadFrom ¶
func (r *MemoryGlobalReader[E]) ReadFrom(_ context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error)
type Option ¶
Option configures a subscription created from a StateView.
func WithCheckpointEvery ¶
WithCheckpointEvery saves the checkpoint every N events instead of after every event. This dramatically reduces DB write pressure during catch-up at the cost of replaying up to N-1 events on crash recovery (idempotent handlers recommended).
Default: 1 (save after every event — backward compatible). Recommended: 100 for most workloads.
func WithCheckpointFlushInterval ¶
WithCheckpointFlushInterval sets the maximum time between checkpoint saves. When set, a checkpoint is saved when either CheckpointEvery OR the flush interval is reached, whichever comes first. This bounds the replay window in wall-clock time even when events arrive slowly.
Default: 0 (disabled — only count-based batching).
func WithLagMonitor ¶
func WithLagMonitor[E any](callback LagCallback, threshold uint64, checkInterval time.Duration) Option[E]
WithLagMonitor enables periodic lag monitoring for the subscription. The callback fires when the subscription falls more than threshold events behind the store head. checkInterval controls how often the check runs.
Example:
WithLagMonitor[MyEvent](func(name string, lag, pos, latest uint64) {
slog.Warn("subscription lagging", "consumer", name, "lag", lag)
}, 1000, 10*time.Second)
func WithLeaderLock ¶
func WithLeaderLock[E any](lock eskit.LockRegistry) Option[E]
WithLeaderLock enables leader election for the subscription. Only the instance holding the lock will process events.
func WithNotifier ¶
func WithNotifier[E any](n StoreNotifier) Option[E]
WithNotifier sets the store notifier for low-latency event delivery.
type SequenceChecker ¶
SequenceChecker verifies whether a specific global sequence exists in the store. Used for self-healing gap resolution: before skipping a gap after MaxGapWait, the subscription checks if the event actually committed (slow transaction).
type StoreAdapter ¶
type StoreAdapter[E any] struct { // contains filtered or unexported fields }
StoreAdapter adapts any StoreReader (eskit.Event-based) into a subscription.GlobalReader (GlobalEvent-based). This bridges the type gap between the event store layer and the subscription system.
func NewStoreAdapter ¶
func NewStoreAdapter[E any](store StoreReader[E]) *StoreAdapter[E]
NewStoreAdapter wraps a store's global reader into a subscription.GlobalReader.
func (*StoreAdapter[E]) LatestSequence ¶
func (a *StoreAdapter[E]) LatestSequence(ctx context.Context) (uint64, error)
LatestSequence delegates to the underlying store.
func (*StoreAdapter[E]) ReadFrom ¶
func (a *StoreAdapter[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error)
ReadFrom reads events from the store and converts them to GlobalEvents.
type StoreNotifier ¶
StoreNotifier signals when new events are available in the store. This is an optimization for low latency — subscriptions work without it via periodic polling. The value on the channel is the latest global sequence (or 0 if unknown).
type StoreReader ¶
type StoreReader[E any] interface { ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]eskit.Event[E], error) LatestSequence(ctx context.Context) (uint64, error) }
StoreReader is the interface that event stores implement for global reads. Both MemoryStore and sqlitestore.Store satisfy this via their ReadFrom/LatestSequence methods.
type SubscriptionManager ¶
type SubscriptionManager struct {
// contains filtered or unexported fields
}
SubscriptionManager manages multiple subscriptions with lifecycle control.
func NewManager ¶
func NewManager() *SubscriptionManager
NewManager creates a new subscription manager.
func (*SubscriptionManager) ConsumerIDs ¶
func (m *SubscriptionManager) ConsumerIDs() []string
ConsumerIDs returns the IDs of all active subscriptions.
func (*SubscriptionManager) Remove ¶
func (m *SubscriptionManager) Remove(consumerID string) error
Remove stops and removes a subscription.
func (*SubscriptionManager) StopAll ¶
func (m *SubscriptionManager) StopAll()
StopAll stops all subscriptions and waits for them to finish.