stream

package
v0.0.0-...-6a00dce Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2025 License: Apache-2.0, MIT Imports: 30 Imported by: 0

README

Stream Architecture

Stream extends github.com/delaneyj/witchbolt with Litestream-style, page-level replication. The controller hooks into the database commit path, captures the pages written for each transaction, and persists them to a local shadow log before forwarding them to one or more remote replicas.

Replication model

  • Page frames: Each commit emits a list of page frames that have been dirtied. Stream mirrors these frames into a segment file that can be replayed to reconstruct the database state.
  • Generations: A generation is a contiguous snapshot plus all subsequent segments. Generations rotate automatically if the controller detects a gap or an out-of-order transaction.
  • Snapshots: Full database snapshots are taken at configurable intervals to bound recovery time. Snapshots are versioned by generation and timestamp.
  • Retention: Background retention jobs delete expired snapshots and any segments that are older than the oldest retained snapshot.
  • Data loss window: The controller tracks the timestamp of the latest successful replication to each replica and reports the maximum lag.

Storage replicas

Stream ships with pluggable replica backends:

  • file: write segments and snapshots to a local directory tree.
  • s3: stream artefacts to any S3-compatible API via the MinIO client (AWS, GCP, Azure, MinIO, etc.).
  • sftp: push artefacts over SSH/SFTP to a remote host.
  • nats: store artefacts in a pre-provisioned NATS JetStream object store bucket.

These implementations are direct ports of Litestream's storage clients adapted to Stream's segment/snapshot format. Each backend exposes the same interface so new destinations can be added without modifying the core controller.

Compression

Segments and snapshots are compressed with Zstandard by default. The compression block in the controller config accepts codec: "none" to disable compression or codec: "zstd" (default) with an optional quality level (mapped to the closest Zstandard encoder level). These options apply globally to ensure deterministic restores.

Compression: stream.CompressionConfig{
	Codec: stream.CompressionZSTD,
	Level: 6,
}

Usage

Register Stream via the PageFlushObservers option when opening a database:

db, err := witchbolt.Open(path, 0600, &witchbolt.Options{
	PageFlushObservers: []witchbolt.PageFlushObserverRegistration{
		stream.Observer(context.Background(), stream.Config{
			ShadowDir:        "/var/lib/myapp/stream",
			SnapshotInterval: 5 * time.Minute,
			Compression:      stream.CompressionConfig{Codec: stream.CompressionZSTD, Level: 6},
		Replicas: []stream.ReplicaConfig{
			&stream.FileReplicaConfig{Path: "/backups"},
			&stream.S3CompatibleConfig{
				Bucket:   "example-bucket",
				Prefix:   "stream",
				Endpoint: "s3.us-east-1.amazonaws.com",
				Region:   "us-east-1",
			},
			&stream.SFTPReplicaConfig{
				Host:    "backup.example.com",
				User:    "replicator",
				KeyPath: "/etc/witchbolt/sftp_key",
				Path:    "backups/db",
			},
			&stream.NATSReplicaConfig{
				URL:    "nats://nats.example.com:4222",
				Bucket: "litestream-backups",
				Prefix: "cluster-a/db",
				Creds:  "/etc/witchbolt/nats.creds",
			},
		},
		}),
	},
})
if err != nil {
    log.Fatal(err)
}
defer db.Close()

Restore flow

  1. Discover the newest generation and snapshot.
  2. Download and decompress the snapshot into a scratch location.
  3. Fetch and apply all newer segments.
  4. Atomically move the restored database into place.

The controller exposes a helper that will optionally run this flow automatically before opening the database, ensuring nodes can bootstrap themselves.

Provenance

The Stream module and its replica targets are derived from Ben Johnson's Litestream project and carry the same Apache 2.0 licensing obligations. Please refer to the top-level README and LICENSE for attribution details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Observer

Observer returns a PageFlushObserverRegistration that wires stream into witchbolt.Open options.

func RestoreStandalone

func RestoreStandalone(ctx context.Context, cfg Config) error

RestoreStandalone builds replicas from configuration and restores the database to disk.

Types

type CompressionConfig

type CompressionConfig struct {
	Codec  CompressionType `json:"codec"`
	Level  int             `json:"level,omitempty"`
	Window int             `json:"window,omitempty"`
}

CompressionConfig defines codec-agnostic tuning parameters.

func (*CompressionConfig) UnmarshalJSON

func (c *CompressionConfig) UnmarshalJSON(data []byte) error

type CompressionType

type CompressionType string

CompressionType enumerates the available wire compression codecs.

const (
	// CompressionNone disables compression for segments and snapshots.
	CompressionNone CompressionType = "none"
	// CompressionZSTD compresses payloads with Zstandard.
	CompressionZSTD CompressionType = "zstd"
)

type Config

type Config struct {
	// ShadowDir stores local segments and snapshots before upload.
	ShadowDir string `json:"shadowDir"`

	// SnapshotInterval controls how frequently full snapshots are taken.
	SnapshotInterval time.Duration `json:"snapshotInterval"`

	// Retention governs automatic pruning of old artefacts.
	Retention RetentionConfig `json:"retention"`

	// Compression configures the codec and tuning options for artefacts.
	Compression CompressionConfig `json:"compression"`

	// Replicas defines zero or more remote destinations.
	Replicas []ReplicaConfig `json:"replicas"`

	// Restore enables automatic restore on startup if the database file
	// does not exist or fails validation.
	Restore RestoreConfig `json:"restore"`

	// DataLossWindowThreshold controls the alerting threshold for acceptable
	// replication lag duration. Zero disables warnings.
	DataLossWindowThreshold time.Duration `json:"dataLossWindowThreshold"`
}

Config drives the stream controller behaviour.

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller implements a witchbolt.PageFlushObserver and orchestrates replication.

func Enable

func Enable(ctx context.Context, db *witchbolt.DB, cfg Config) (*Controller, error)

Enable constructs and starts a controller based on the provided configuration.

func NewController

func NewController(db *witchbolt.DB, cfg Config, replicas []Replica) (*Controller, error)

NewController creates a stream controller for the provided database.

func (*Controller) DataLossWindow

func (c *Controller) DataLossWindow() time.Duration

DataLossWindow reports the current worst-case replication lag.

func (*Controller) OnPageFlush

func (c *Controller) OnPageFlush(info witchbolt.PageFlushInfo) error

OnPageFlush implements witchbolt.PageFlushObserver.

func (*Controller) Start

func (c *Controller) Start(ctx context.Context) error

Start attaches the controller to the DB and launches background tasks.

func (*Controller) Stop

func (c *Controller) Stop(ctx context.Context) error

Stop detaches the controller and waits for background tasks to finish.

type FileReplica

type FileReplica struct {
	// contains filtered or unexported fields
}

FileReplica persists artefacts to the local filesystem.

func NewFileReplica

func NewFileReplica(cfg *FileReplicaConfig) (*FileReplica, error)

NewFileReplica constructs a FileReplica backed by a directory tree.

func (*FileReplica) Close

func (r *FileReplica) Close(context.Context) error

Close releases resources. No-op for file replica.

func (*FileReplica) FetchSegment

func (r *FileReplica) FetchSegment(ctx context.Context, generation string, desc SegmentDescriptor) (*Segment, error)

FetchSegment retrieves the referenced segment payload from disk.

func (*FileReplica) FetchSnapshot

func (r *FileReplica) FetchSnapshot(ctx context.Context, generation string, desc *SnapshotDescriptor) (*Snapshot, error)

FetchSnapshot retrieves the referenced snapshot payload from disk.

func (*FileReplica) LatestState

func (r *FileReplica) LatestState(ctx context.Context) (*RestoreState, error)

LatestState returns the most recent restore metadata.

func (*FileReplica) Name

func (r *FileReplica) Name() string

Name implements Replica.

func (*FileReplica) Prune

func (r *FileReplica) Prune(ctx context.Context, generation string, retention RetentionConfig) error

Prune removes expired artefacts according to retention policy.

func (*FileReplica) PutSegment

func (r *FileReplica) PutSegment(ctx context.Context, generation string, segment *Segment) error

PutSegment writes the segment payload and adds it to replica state.

func (*FileReplica) PutSnapshot

func (r *FileReplica) PutSnapshot(ctx context.Context, generation string, snapshot *Snapshot) error

PutSnapshot writes the snapshot payload and updates replica state.

type FileReplicaConfig

type FileReplicaConfig struct {
	Path string `json:"path"`
}

FileReplicaConfig defines the local filesystem replica behaviour.

type NATSReplica

type NATSReplica struct {
	// contains filtered or unexported fields
}

NATSReplica persists artefacts via NATS JetStream object storage.

func NewNATSReplica

func NewNATSReplica(_ context.Context, cfg *NATSReplicaConfig) (*NATSReplica, error)

NewNATSReplica constructs a JetStream-backed replica using the provided configuration.

func (*NATSReplica) Close

func (r *NATSReplica) Close(context.Context) error

Close terminates the JetStream connection.

func (*NATSReplica) FetchSegment

func (r *NATSReplica) FetchSegment(ctx context.Context, generation string, desc SegmentDescriptor) (*Segment, error)

FetchSegment downloads and decodes the referenced segment object.

func (*NATSReplica) FetchSnapshot

func (r *NATSReplica) FetchSnapshot(ctx context.Context, generation string, desc *SnapshotDescriptor) (*Snapshot, error)

FetchSnapshot downloads and decodes the referenced snapshot object.

func (*NATSReplica) LatestState

func (r *NATSReplica) LatestState(ctx context.Context) (*RestoreState, error)

LatestState retrieves the replica manifest from JetStream.

func (*NATSReplica) Name

func (r *NATSReplica) Name() string

Name implements Replica.

func (*NATSReplica) Prune

func (r *NATSReplica) Prune(ctx context.Context, generation string, retention RetentionConfig) error

Prune removes stale artefacts according to retention rules.

func (*NATSReplica) PutSegment

func (r *NATSReplica) PutSegment(ctx context.Context, generation string, segment *Segment) error

PutSegment uploads a segment artefact to JetStream.

func (*NATSReplica) PutSnapshot

func (r *NATSReplica) PutSnapshot(ctx context.Context, generation string, snapshot *Snapshot) error

PutSnapshot uploads snapshot artefact data into JetStream.

type NATSReplicaConfig

type NATSReplicaConfig struct {
	URL     string   `json:"url"`
	Bucket  string   `json:"bucket"`
	Prefix  string   `json:"prefix"`
	Creds   string   `json:"creds"`
	NKey    string   `json:"nkey"`
	RootCAs []string `json:"rootCAs"`
}

NATSReplicaConfig configures the NATS JetStream replica backend.

type PageFrame

type PageFrame struct {
	ID       uint64
	Overflow uint32
	Data     []byte
}

PageFrame captures a single page and its payload.

type Replica

type Replica interface {
	// Name returns a stable identifier for logs and metrics.
	Name() string
	// PutSnapshot persists a full snapshot blob for the given generation.
	PutSnapshot(ctx context.Context, generation string, snapshot *Snapshot) error

	// PutSegment persists an incremental segment blob.
	PutSegment(ctx context.Context, generation string, segment *Segment) error

	// Prune applies retention rules and deletes expired data.
	Prune(ctx context.Context, generation string, retention RetentionConfig) error

	// FetchSnapshot retrieves the referenced snapshot blob.
	FetchSnapshot(ctx context.Context, generation string, desc *SnapshotDescriptor) (*Snapshot, error)

	// FetchSegment retrieves the referenced segment blob.
	FetchSegment(ctx context.Context, generation string, desc SegmentDescriptor) (*Segment, error)

	// LatestState returns the newest generation snapshot metadata for restores.
	LatestState(ctx context.Context) (*RestoreState, error)

	// Close releases any held resources.
	Close(ctx context.Context) error
}

Replica provides storage for Stream artefacts.

func BuildReplicas

func BuildReplicas(ctx context.Context, cfg Config) ([]Replica, error)

BuildReplicas constructs replica implementations from configuration.

type ReplicaConfig

type ReplicaConfig interface {
	// contains filtered or unexported methods
}

ReplicaConfig describes a backend-specific replica configuration.

type RestoreConfig

type RestoreConfig struct {
	// Enabled toggles automatic restores.
	Enabled bool `json:"enabled"`

	// TargetPath allows overriding the default database path.
	TargetPath string `json:"targetPath"`

	// TempDir controls where intermediate restore files live.
	TempDir string `json:"tempDir"`
}

RestoreConfig instructs the controller how and when to restore.

type RestoreState

type RestoreState struct {
	Generation   string
	Snapshot     *SnapshotDescriptor
	Segments     []SegmentDescriptor
	LastUploaded time.Time
}

RestoreState describes the current head artefact for a replica.

type RetentionConfig

type RetentionConfig struct {
	// SnapshotInterval optionally overrides Config.SnapshotInterval for
	// retention enforcement. Zero inherits the controller interval.
	SnapshotInterval time.Duration `json:"snapshotInterval"`

	// SnapshotRetention is the minimum duration to keep snapshots.
	SnapshotRetention time.Duration `json:"snapshotRetention"`

	// CheckInterval configures how often the pruning loop runs.
	CheckInterval time.Duration `json:"checkInterval"`
}

RetentionConfig describes snapshot & segment pruning rules.

type S3CompatibleConfig

type S3CompatibleConfig struct {
	Endpoint       string `json:"endpoint"`
	Region         string `json:"region"`
	Bucket         string `json:"bucket"`
	Prefix         string `json:"prefix"`
	AccessKey      string `json:"accessKey"`
	SecretKey      string `json:"secretKey"`
	SessionToken   string `json:"sessionToken"`
	Insecure       bool   `json:"insecure"`
	ForcePathStyle bool   `json:"forcePathStyle"`
}

S3CompatibleConfig configures a generic S3-compatible backend.

type S3CompatibleReplica

type S3CompatibleReplica struct {
	// contains filtered or unexported fields
}

S3CompatibleReplica stores artefacts in any S3-compatible object storage.

func NewS3CompatibleReplica

func NewS3CompatibleReplica(ctx context.Context, cfg *S3CompatibleConfig) (*S3CompatibleReplica, error)

NewS3CompatibleReplica constructs an S3-compatible replica backed by MinIO client.

func (*S3CompatibleReplica) Close

Close satisfies the Replica interface. MinIO client does not hold open resources.

func (*S3CompatibleReplica) FetchSegment

func (r *S3CompatibleReplica) FetchSegment(ctx context.Context, generation string, desc SegmentDescriptor) (*Segment, error)

FetchSegment downloads and decodes a segment artefact.

func (*S3CompatibleReplica) FetchSnapshot

func (r *S3CompatibleReplica) FetchSnapshot(ctx context.Context, generation string, desc *SnapshotDescriptor) (*Snapshot, error)

FetchSnapshot downloads and decodes a snapshot artefact.

func (*S3CompatibleReplica) LatestState

func (r *S3CompatibleReplica) LatestState(ctx context.Context) (*RestoreState, error)

LatestState retrieves the replica state manifest.

func (*S3CompatibleReplica) Name

func (r *S3CompatibleReplica) Name() string

Name implements Replica.

func (*S3CompatibleReplica) Prune

func (r *S3CompatibleReplica) Prune(ctx context.Context, generation string, retention RetentionConfig) error

Prune applies the retention policy to snapshots and segments.

func (*S3CompatibleReplica) PutSegment

func (r *S3CompatibleReplica) PutSegment(ctx context.Context, generation string, segment *Segment) error

PutSegment uploads the segment artefact and appends metadata to replica state.

func (*S3CompatibleReplica) PutSnapshot

func (r *S3CompatibleReplica) PutSnapshot(ctx context.Context, generation string, snapshot *Snapshot) error

PutSnapshot uploads the snapshot artefact and updates replica state.

type SFTPReplica

type SFTPReplica struct {
	// contains filtered or unexported fields
}

SFTPReplica persists artefacts over SFTP.

func NewSFTPReplica

func NewSFTPReplica(_ context.Context, cfg *SFTPReplicaConfig) (*SFTPReplica, error)

NewSFTPReplica constructs an SFTP replica backed by the provided configuration.

func (*SFTPReplica) Close

func (r *SFTPReplica) Close(context.Context) error

Close releases any open SFTP/SSH connections.

func (*SFTPReplica) FetchSegment

func (r *SFTPReplica) FetchSegment(ctx context.Context, generation string, desc SegmentDescriptor) (*Segment, error)

FetchSegment downloads and decodes the referenced segment blob.

func (*SFTPReplica) FetchSnapshot

func (r *SFTPReplica) FetchSnapshot(ctx context.Context, generation string, desc *SnapshotDescriptor) (*Snapshot, error)

FetchSnapshot downloads and decodes the referenced snapshot blob.

func (*SFTPReplica) LatestState

func (r *SFTPReplica) LatestState(ctx context.Context) (*RestoreState, error)

LatestState retrieves the replica metadata manifest.

func (*SFTPReplica) Name

func (r *SFTPReplica) Name() string

Name implements Replica.

func (*SFTPReplica) Prune

func (r *SFTPReplica) Prune(ctx context.Context, generation string, retention RetentionConfig) error

Prune removes expired artefacts as dictated by the retention policy.

func (*SFTPReplica) PutSegment

func (r *SFTPReplica) PutSegment(ctx context.Context, generation string, segment *Segment) error

PutSegment uploads a segment artefact and records metadata in replica state.

func (*SFTPReplica) PutSnapshot

func (r *SFTPReplica) PutSnapshot(ctx context.Context, generation string, snapshot *Snapshot) error

PutSnapshot uploads the snapshot artefact to the remote target and updates replica state.

type SFTPReplicaConfig

type SFTPReplicaConfig struct {
	Host     string `json:"host"`
	Port     int    `json:"port"`
	User     string `json:"user"`
	Password string `json:"password"`
	KeyPath  string `json:"keyPath"`
	Path     string `json:"path"`
}

SFTPReplicaConfig configures the SFTP replica backend.

type Segment

type Segment struct {
	Header SegmentHeader
	Pages  []PageFrame
	Data   []byte
}

Segment is the binary unit representing a set of page writes.

type SegmentDescriptor

type SegmentDescriptor struct {
	Name      string
	FirstTxID uint64
	LastTxID  uint64
	Timestamp time.Time
	Size      int64
}

SegmentDescriptor references a single segment artefact.

type SegmentHeader

type SegmentHeader struct {
	Magic             string            `json:"magic" cbor:"magic"`
	Version           int               `json:"version" cbor:"version"`
	TxID              uint64            `json:"txId" cbor:"txId"`
	ParentTxID        uint64            `json:"parentTxId" cbor:"parentTxId"`
	PageCount         int               `json:"pageCount" cbor:"pageCount"`
	PageSize          int               `json:"pageSize" cbor:"pageSize"`
	Checksum          uint64            `json:"checksum" cbor:"checksum"`
	Compression       CompressionType   `json:"compression" cbor:"compression"`
	CompressionLevel  int               `json:"compressionLevel,omitempty" cbor:"compressionLevel,omitempty"`
	CompressionWindow int               `json:"compressionWindow,omitempty" cbor:"compressionWindow,omitempty"`
	CreatedAt         time.Time         `json:"createdAt" cbor:"createdAt"`
	HighWaterMark     uint64            `json:"highWaterMark" cbor:"highWaterMark"`
	AdditionalAttrs   map[string]string `json:"additionalAttrs,omitempty" cbor:"additionalAttrs,omitempty"`
}

SegmentHeader stores metadata written alongside a segment.

func DecodeSegmentHeader

func DecodeSegmentHeader(buf []byte) (SegmentHeader, error)

DecodeSegmentHeader reads a JSON-encoded header and performs basic validation.

func (SegmentHeader) Encode

func (h SegmentHeader) Encode() ([]byte, error)

EncodeHeader marshals the segment header to bytes.

type Snapshot

type Snapshot struct {
	Header SnapshotHeader
	Data   []byte
}

Snapshot represents a complete copy of the database file.

type SnapshotDescriptor

type SnapshotDescriptor struct {
	Name      string
	Timestamp time.Time
	Size      int64
}

SnapshotDescriptor references a stored snapshot object.

type SnapshotHeader

type SnapshotHeader struct {
	Magic             string          `json:"magic" cbor:"magic"`
	Version           int             `json:"version" cbor:"version"`
	TxID              uint64          `json:"txId" cbor:"txId"`
	PageCount         uint64          `json:"pageCount" cbor:"pageCount"`
	PageSize          int             `json:"pageSize" cbor:"pageSize"`
	Compression       CompressionType `json:"compression" cbor:"compression"`
	CompressionLevel  int             `json:"compressionLevel,omitempty" cbor:"compressionLevel,omitempty"`
	CompressionWindow int             `json:"compressionWindow,omitempty" cbor:"compressionWindow,omitempty"`
	CreatedAt         time.Time       `json:"createdAt" cbor:"createdAt"`
}

SnapshotHeader describes a snapshot artefact.

func DecodeSnapshotHeader

func DecodeSnapshotHeader(buf []byte) (SnapshotHeader, error)

DecodeSnapshotHeader parses a JSON snapshot header.

func (SnapshotHeader) Encode

func (h SnapshotHeader) Encode() ([]byte, error)

Encode marshals a snapshot header to bytes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL