Documentation
¶
Index ¶
- func Observer(ctx context.Context, cfg Config) witchbolt.PageFlushObserverRegistration
- func RestoreStandalone(ctx context.Context, cfg Config) error
- type CompressionConfig
- type CompressionType
- type Config
- type Controller
- type FileReplica
- func (r *FileReplica) Close(context.Context) error
- func (r *FileReplica) FetchSegment(ctx context.Context, generation string, desc SegmentDescriptor) (*Segment, error)
- func (r *FileReplica) FetchSnapshot(ctx context.Context, generation string, desc *SnapshotDescriptor) (*Snapshot, error)
- func (r *FileReplica) LatestState(ctx context.Context) (*RestoreState, error)
- func (r *FileReplica) Name() string
- func (r *FileReplica) Prune(ctx context.Context, generation string, retention RetentionConfig) error
- func (r *FileReplica) PutSegment(ctx context.Context, generation string, segment *Segment) error
- func (r *FileReplica) PutSnapshot(ctx context.Context, generation string, snapshot *Snapshot) error
- type FileReplicaConfig
- type NATSReplica
- func (r *NATSReplica) Close(context.Context) error
- func (r *NATSReplica) FetchSegment(ctx context.Context, generation string, desc SegmentDescriptor) (*Segment, error)
- func (r *NATSReplica) FetchSnapshot(ctx context.Context, generation string, desc *SnapshotDescriptor) (*Snapshot, error)
- func (r *NATSReplica) LatestState(ctx context.Context) (*RestoreState, error)
- func (r *NATSReplica) Name() string
- func (r *NATSReplica) Prune(ctx context.Context, generation string, retention RetentionConfig) error
- func (r *NATSReplica) PutSegment(ctx context.Context, generation string, segment *Segment) error
- func (r *NATSReplica) PutSnapshot(ctx context.Context, generation string, snapshot *Snapshot) error
- type NATSReplicaConfig
- type PageFrame
- type Replica
- type ReplicaConfig
- type RestoreConfig
- type RestoreState
- type RetentionConfig
- type S3CompatibleConfig
- type S3CompatibleReplica
- func (r *S3CompatibleReplica) Close(context.Context) error
- func (r *S3CompatibleReplica) FetchSegment(ctx context.Context, generation string, desc SegmentDescriptor) (*Segment, error)
- func (r *S3CompatibleReplica) FetchSnapshot(ctx context.Context, generation string, desc *SnapshotDescriptor) (*Snapshot, error)
- func (r *S3CompatibleReplica) LatestState(ctx context.Context) (*RestoreState, error)
- func (r *S3CompatibleReplica) Name() string
- func (r *S3CompatibleReplica) Prune(ctx context.Context, generation string, retention RetentionConfig) error
- func (r *S3CompatibleReplica) PutSegment(ctx context.Context, generation string, segment *Segment) error
- func (r *S3CompatibleReplica) PutSnapshot(ctx context.Context, generation string, snapshot *Snapshot) error
- type SFTPReplica
- func (r *SFTPReplica) Close(context.Context) error
- func (r *SFTPReplica) FetchSegment(ctx context.Context, generation string, desc SegmentDescriptor) (*Segment, error)
- func (r *SFTPReplica) FetchSnapshot(ctx context.Context, generation string, desc *SnapshotDescriptor) (*Snapshot, error)
- func (r *SFTPReplica) LatestState(ctx context.Context) (*RestoreState, error)
- func (r *SFTPReplica) Name() string
- func (r *SFTPReplica) Prune(ctx context.Context, generation string, retention RetentionConfig) error
- func (r *SFTPReplica) PutSegment(ctx context.Context, generation string, segment *Segment) error
- func (r *SFTPReplica) PutSnapshot(ctx context.Context, generation string, snapshot *Snapshot) error
- type SFTPReplicaConfig
- type Segment
- type SegmentDescriptor
- type SegmentHeader
- type Snapshot
- type SnapshotDescriptor
- type SnapshotHeader
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type CompressionConfig ¶
type CompressionConfig struct {
Codec CompressionType `json:"codec"`
Level int `json:"level,omitempty"`
Window int `json:"window,omitempty"`
}
CompressionConfig defines codec-agnostic tuning parameters.
func (*CompressionConfig) UnmarshalJSON ¶
func (c *CompressionConfig) UnmarshalJSON(data []byte) error
type CompressionType ¶
type CompressionType string
CompressionType enumerates the available wire compression codecs.
const ( // CompressionNone disables compression for segments and snapshots. CompressionNone CompressionType = "none" // CompressionZSTD compresses payloads with Zstandard. CompressionZSTD CompressionType = "zstd" )
type Config ¶
type Config struct {
// ShadowDir stores local segments and snapshots before upload.
ShadowDir string `json:"shadowDir"`
// SnapshotInterval controls how frequently full snapshots are taken.
SnapshotInterval time.Duration `json:"snapshotInterval"`
// Retention governs automatic pruning of old artefacts.
Retention RetentionConfig `json:"retention"`
// Compression configures the codec and tuning options for artefacts.
Compression CompressionConfig `json:"compression"`
// Replicas defines zero or more remote destinations.
Replicas []ReplicaConfig `json:"replicas"`
// Restore enables automatic restore on startup if the database file
// does not exist or fails validation.
Restore RestoreConfig `json:"restore"`
// DataLossWindowThreshold controls the alerting threshold for acceptable
// replication lag duration. Zero disables warnings.
DataLossWindowThreshold time.Duration `json:"dataLossWindowThreshold"`
}
Config drives the stream controller behaviour.
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
Controller implements a witchbolt.PageFlushObserver and orchestrates replication.
func NewController ¶
NewController creates a stream controller for the provided database.
func (*Controller) DataLossWindow ¶
func (c *Controller) DataLossWindow() time.Duration
DataLossWindow reports the current worst-case replication lag.
func (*Controller) OnPageFlush ¶
func (c *Controller) OnPageFlush(info witchbolt.PageFlushInfo) error
OnPageFlush implements witchbolt.PageFlushObserver.
type FileReplica ¶
type FileReplica struct {
// contains filtered or unexported fields
}
FileReplica persists artefacts to the local filesystem.
func NewFileReplica ¶
func NewFileReplica(cfg *FileReplicaConfig) (*FileReplica, error)
NewFileReplica constructs a FileReplica backed by a directory tree.
func (*FileReplica) Close ¶
func (r *FileReplica) Close(context.Context) error
Close releases resources. No-op for file replica.
func (*FileReplica) FetchSegment ¶
func (r *FileReplica) FetchSegment(ctx context.Context, generation string, desc SegmentDescriptor) (*Segment, error)
FetchSegment retrieves the referenced segment payload from disk.
func (*FileReplica) FetchSnapshot ¶
func (r *FileReplica) FetchSnapshot(ctx context.Context, generation string, desc *SnapshotDescriptor) (*Snapshot, error)
FetchSnapshot retrieves the referenced snapshot payload from disk.
func (*FileReplica) LatestState ¶
func (r *FileReplica) LatestState(ctx context.Context) (*RestoreState, error)
LatestState returns the most recent restore metadata.
func (*FileReplica) Prune ¶
func (r *FileReplica) Prune(ctx context.Context, generation string, retention RetentionConfig) error
Prune removes expired artefacts according to retention policy.
func (*FileReplica) PutSegment ¶
PutSegment writes the segment payload and adds it to replica state.
func (*FileReplica) PutSnapshot ¶
PutSnapshot writes the snapshot payload and updates replica state.
type FileReplicaConfig ¶
type FileReplicaConfig struct {
Path string `json:"path"`
}
FileReplicaConfig defines the local filesystem replica behaviour.
type NATSReplica ¶
type NATSReplica struct {
// contains filtered or unexported fields
}
NATSReplica persists artefacts via NATS JetStream object storage.
func NewNATSReplica ¶
func NewNATSReplica(_ context.Context, cfg *NATSReplicaConfig) (*NATSReplica, error)
NewNATSReplica constructs a JetStream-backed replica using the provided configuration.
func (*NATSReplica) Close ¶
func (r *NATSReplica) Close(context.Context) error
Close terminates the JetStream connection.
func (*NATSReplica) FetchSegment ¶
func (r *NATSReplica) FetchSegment(ctx context.Context, generation string, desc SegmentDescriptor) (*Segment, error)
FetchSegment downloads and decodes the referenced segment object.
func (*NATSReplica) FetchSnapshot ¶
func (r *NATSReplica) FetchSnapshot(ctx context.Context, generation string, desc *SnapshotDescriptor) (*Snapshot, error)
FetchSnapshot downloads and decodes the referenced snapshot object.
func (*NATSReplica) LatestState ¶
func (r *NATSReplica) LatestState(ctx context.Context) (*RestoreState, error)
LatestState retrieves the replica manifest from JetStream.
func (*NATSReplica) Prune ¶
func (r *NATSReplica) Prune(ctx context.Context, generation string, retention RetentionConfig) error
Prune removes stale artefacts according to retention rules.
func (*NATSReplica) PutSegment ¶
PutSegment uploads a segment artefact to JetStream.
func (*NATSReplica) PutSnapshot ¶
PutSnapshot uploads snapshot artefact data into JetStream.
type NATSReplicaConfig ¶
type NATSReplicaConfig struct {
URL string `json:"url"`
Bucket string `json:"bucket"`
Prefix string `json:"prefix"`
Creds string `json:"creds"`
NKey string `json:"nkey"`
RootCAs []string `json:"rootCAs"`
}
NATSReplicaConfig configures the NATS JetStream replica backend.
type Replica ¶
type Replica interface {
// Name returns a stable identifier for logs and metrics.
Name() string
// PutSnapshot persists a full snapshot blob for the given generation.
PutSnapshot(ctx context.Context, generation string, snapshot *Snapshot) error
// PutSegment persists an incremental segment blob.
PutSegment(ctx context.Context, generation string, segment *Segment) error
// Prune applies retention rules and deletes expired data.
Prune(ctx context.Context, generation string, retention RetentionConfig) error
// FetchSnapshot retrieves the referenced snapshot blob.
FetchSnapshot(ctx context.Context, generation string, desc *SnapshotDescriptor) (*Snapshot, error)
// FetchSegment retrieves the referenced segment blob.
FetchSegment(ctx context.Context, generation string, desc SegmentDescriptor) (*Segment, error)
// LatestState returns the newest generation snapshot metadata for restores.
LatestState(ctx context.Context) (*RestoreState, error)
// Close releases any held resources.
Close(ctx context.Context) error
}
Replica provides storage for Stream artefacts.
type ReplicaConfig ¶
type ReplicaConfig interface {
// contains filtered or unexported methods
}
ReplicaConfig describes a backend-specific replica configuration.
type RestoreConfig ¶
type RestoreConfig struct {
// Enabled toggles automatic restores.
Enabled bool `json:"enabled"`
// TargetPath allows overriding the default database path.
TargetPath string `json:"targetPath"`
// TempDir controls where intermediate restore files live.
TempDir string `json:"tempDir"`
}
RestoreConfig instructs the controller how and when to restore.
type RestoreState ¶
type RestoreState struct {
Generation string
Snapshot *SnapshotDescriptor
Segments []SegmentDescriptor
LastUploaded time.Time
}
RestoreState describes the current head artefact for a replica.
type RetentionConfig ¶
type RetentionConfig struct {
// SnapshotInterval optionally overrides Config.SnapshotInterval for
// retention enforcement. Zero inherits the controller interval.
SnapshotInterval time.Duration `json:"snapshotInterval"`
// SnapshotRetention is the minimum duration to keep snapshots.
SnapshotRetention time.Duration `json:"snapshotRetention"`
// CheckInterval configures how often the pruning loop runs.
CheckInterval time.Duration `json:"checkInterval"`
}
RetentionConfig describes snapshot & segment pruning rules.
type S3CompatibleConfig ¶
type S3CompatibleConfig struct {
Endpoint string `json:"endpoint"`
Region string `json:"region"`
Bucket string `json:"bucket"`
Prefix string `json:"prefix"`
AccessKey string `json:"accessKey"`
SecretKey string `json:"secretKey"`
SessionToken string `json:"sessionToken"`
Insecure bool `json:"insecure"`
ForcePathStyle bool `json:"forcePathStyle"`
}
S3CompatibleConfig configures a generic S3-compatible backend.
type S3CompatibleReplica ¶
type S3CompatibleReplica struct {
// contains filtered or unexported fields
}
S3CompatibleReplica stores artefacts in any S3-compatible object storage.
func NewS3CompatibleReplica ¶
func NewS3CompatibleReplica(ctx context.Context, cfg *S3CompatibleConfig) (*S3CompatibleReplica, error)
NewS3CompatibleReplica constructs an S3-compatible replica backed by MinIO client.
func (*S3CompatibleReplica) Close ¶
func (r *S3CompatibleReplica) Close(context.Context) error
Close satisfies the Replica interface. MinIO client does not hold open resources.
func (*S3CompatibleReplica) FetchSegment ¶
func (r *S3CompatibleReplica) FetchSegment(ctx context.Context, generation string, desc SegmentDescriptor) (*Segment, error)
FetchSegment downloads and decodes a segment artefact.
func (*S3CompatibleReplica) FetchSnapshot ¶
func (r *S3CompatibleReplica) FetchSnapshot(ctx context.Context, generation string, desc *SnapshotDescriptor) (*Snapshot, error)
FetchSnapshot downloads and decodes a snapshot artefact.
func (*S3CompatibleReplica) LatestState ¶
func (r *S3CompatibleReplica) LatestState(ctx context.Context) (*RestoreState, error)
LatestState retrieves the replica state manifest.
func (*S3CompatibleReplica) Name ¶
func (r *S3CompatibleReplica) Name() string
Name implements Replica.
func (*S3CompatibleReplica) Prune ¶
func (r *S3CompatibleReplica) Prune(ctx context.Context, generation string, retention RetentionConfig) error
Prune applies the retention policy to snapshots and segments.
func (*S3CompatibleReplica) PutSegment ¶
func (r *S3CompatibleReplica) PutSegment(ctx context.Context, generation string, segment *Segment) error
PutSegment uploads the segment artefact and appends metadata to replica state.
func (*S3CompatibleReplica) PutSnapshot ¶
func (r *S3CompatibleReplica) PutSnapshot(ctx context.Context, generation string, snapshot *Snapshot) error
PutSnapshot uploads the snapshot artefact and updates replica state.
type SFTPReplica ¶
type SFTPReplica struct {
// contains filtered or unexported fields
}
SFTPReplica persists artefacts over SFTP.
func NewSFTPReplica ¶
func NewSFTPReplica(_ context.Context, cfg *SFTPReplicaConfig) (*SFTPReplica, error)
NewSFTPReplica constructs an SFTP replica backed by the provided configuration.
func (*SFTPReplica) Close ¶
func (r *SFTPReplica) Close(context.Context) error
Close releases any open SFTP/SSH connections.
func (*SFTPReplica) FetchSegment ¶
func (r *SFTPReplica) FetchSegment(ctx context.Context, generation string, desc SegmentDescriptor) (*Segment, error)
FetchSegment downloads and decodes the referenced segment blob.
func (*SFTPReplica) FetchSnapshot ¶
func (r *SFTPReplica) FetchSnapshot(ctx context.Context, generation string, desc *SnapshotDescriptor) (*Snapshot, error)
FetchSnapshot downloads and decodes the referenced snapshot blob.
func (*SFTPReplica) LatestState ¶
func (r *SFTPReplica) LatestState(ctx context.Context) (*RestoreState, error)
LatestState retrieves the replica metadata manifest.
func (*SFTPReplica) Prune ¶
func (r *SFTPReplica) Prune(ctx context.Context, generation string, retention RetentionConfig) error
Prune removes expired artefacts as dictated by the retention policy.
func (*SFTPReplica) PutSegment ¶
PutSegment uploads a segment artefact and records metadata in replica state.
func (*SFTPReplica) PutSnapshot ¶
PutSnapshot uploads the snapshot artefact to the remote target and updates replica state.
type SFTPReplicaConfig ¶
type SFTPReplicaConfig struct {
Host string `json:"host"`
Port int `json:"port"`
User string `json:"user"`
Password string `json:"password"`
KeyPath string `json:"keyPath"`
Path string `json:"path"`
}
SFTPReplicaConfig configures the SFTP replica backend.
type Segment ¶
type Segment struct {
Header SegmentHeader
Pages []PageFrame
Data []byte
}
Segment is the binary unit representing a set of page writes.
type SegmentDescriptor ¶
type SegmentDescriptor struct {
Name string
FirstTxID uint64
LastTxID uint64
Timestamp time.Time
Size int64
}
SegmentDescriptor references a single segment artefact.
type SegmentHeader ¶
type SegmentHeader struct {
Magic string `json:"magic" cbor:"magic"`
Version int `json:"version" cbor:"version"`
TxID uint64 `json:"txId" cbor:"txId"`
ParentTxID uint64 `json:"parentTxId" cbor:"parentTxId"`
PageCount int `json:"pageCount" cbor:"pageCount"`
PageSize int `json:"pageSize" cbor:"pageSize"`
Checksum uint64 `json:"checksum" cbor:"checksum"`
Compression CompressionType `json:"compression" cbor:"compression"`
CompressionLevel int `json:"compressionLevel,omitempty" cbor:"compressionLevel,omitempty"`
CompressionWindow int `json:"compressionWindow,omitempty" cbor:"compressionWindow,omitempty"`
CreatedAt time.Time `json:"createdAt" cbor:"createdAt"`
HighWaterMark uint64 `json:"highWaterMark" cbor:"highWaterMark"`
AdditionalAttrs map[string]string `json:"additionalAttrs,omitempty" cbor:"additionalAttrs,omitempty"`
}
SegmentHeader stores metadata written alongside a segment.
func DecodeSegmentHeader ¶
func DecodeSegmentHeader(buf []byte) (SegmentHeader, error)
DecodeSegmentHeader reads a JSON-encoded header and performs basic validation.
func (SegmentHeader) Encode ¶
func (h SegmentHeader) Encode() ([]byte, error)
EncodeHeader marshals the segment header to bytes.
type Snapshot ¶
type Snapshot struct {
Header SnapshotHeader
Data []byte
}
Snapshot represents a complete copy of the database file.
type SnapshotDescriptor ¶
SnapshotDescriptor references a stored snapshot object.
type SnapshotHeader ¶
type SnapshotHeader struct {
Magic string `json:"magic" cbor:"magic"`
Version int `json:"version" cbor:"version"`
TxID uint64 `json:"txId" cbor:"txId"`
PageCount uint64 `json:"pageCount" cbor:"pageCount"`
PageSize int `json:"pageSize" cbor:"pageSize"`
Compression CompressionType `json:"compression" cbor:"compression"`
CompressionLevel int `json:"compressionLevel,omitempty" cbor:"compressionLevel,omitempty"`
CompressionWindow int `json:"compressionWindow,omitempty" cbor:"compressionWindow,omitempty"`
CreatedAt time.Time `json:"createdAt" cbor:"createdAt"`
}
SnapshotHeader describes a snapshot artefact.
func DecodeSnapshotHeader ¶
func DecodeSnapshotHeader(buf []byte) (SnapshotHeader, error)
DecodeSnapshotHeader parses a JSON snapshot header.
func (SnapshotHeader) Encode ¶
func (h SnapshotHeader) Encode() ([]byte, error)
Encode marshals a snapshot header to bytes.