Documentation
¶
Index ¶
- type Logger
- type MetricsHook
- type Option
- type SQLiteStore
- func (s *SQLiteStore) Append(ctx context.Context, event *eventbus.Event) (eventbus.Offset, error)
- func (s *SQLiteStore) Close() error
- func (s *SQLiteStore) LoadOffset(ctx context.Context, subscriptionID string) (eventbus.Offset, error)
- func (s *SQLiteStore) Read(ctx context.Context, from eventbus.Offset, limit int) ([]*eventbus.StoredEvent, eventbus.Offset, error)
- func (s *SQLiteStore) ReadStream(ctx context.Context, from eventbus.Offset) iter.Seq2[*eventbus.StoredEvent, error]
- func (s *SQLiteStore) SaveOffset(ctx context.Context, subscriptionID string, offset eventbus.Offset) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Logger ¶
type Logger interface {
Debug(msg string, args ...any)
Info(msg string, args ...any)
Error(msg string, args ...any)
}
Logger is an interface for logging operations
type MetricsHook ¶
type MetricsHook interface {
OnAppend(duration time.Duration, err error)
OnRead(duration time.Duration, count int, err error)
OnSaveOffset(duration time.Duration, err error)
OnLoadOffset(duration time.Duration, err error)
}
MetricsHook is called after store operations complete
type Option ¶
type Option func(*config)
Option configures the SQLiteStore
func WithAutoMigrate ¶
WithAutoMigrate enables or disables automatic schema migration Default is true
func WithBusyTimeout ¶
WithBusyTimeout sets the SQLite busy timeout Default is 5 seconds
func WithMetricsHook ¶
func WithMetricsHook(hook MetricsHook) Option
WithMetricsHook sets the metrics hook for the store
func WithStreamBatchSize ¶
WithStreamBatchSize sets the batch size for LoadStream operations. When > 0, events are fetched in batches of this size using LIMIT. Default is 0 (no batching, fetch all matching rows at once).
type SQLiteStore ¶
type SQLiteStore struct {
// contains filtered or unexported fields
}
SQLiteStore implements eventbus.EventStore using SQLite. It stores events with integer positions internally and exposes them as opaque string offsets externally.
func New ¶
func New(path string, opts ...Option) (*SQLiteStore, error)
New creates a new SQLiteStore with the given path and options.
Note: When WithAutoMigrate is enabled (the default), migrations run with context.Background() and are not cancellable. This ensures migrations complete fully to avoid leaving the database in an inconsistent state.
func (*SQLiteStore) Close ¶
func (s *SQLiteStore) Close() error
Close closes the database connection and releases resources. Prepared statement close errors are ignored as they cannot fail in practice with SQLite (the driver handles cleanup when the connection closes).
func (*SQLiteStore) LoadOffset ¶
func (s *SQLiteStore) LoadOffset(ctx context.Context, subscriptionID string) (eventbus.Offset, error)
LoadOffset implements eventbus.SubscriptionStore
func (*SQLiteStore) Read ¶
func (s *SQLiteStore) Read(ctx context.Context, from eventbus.Offset, limit int) ([]*eventbus.StoredEvent, eventbus.Offset, error)
Read returns events starting after the given offset.
func (*SQLiteStore) ReadStream ¶
func (s *SQLiteStore) ReadStream(ctx context.Context, from eventbus.Offset) iter.Seq2[*eventbus.StoredEvent, error]
ReadStream implements eventbus.EventStoreStreamer for memory-efficient event streaming. It uses cursor-based iteration, keeping only one row in memory at a time. The database rows are properly closed when: - The iteration completes naturally - The consumer breaks out of the range loop - The context is cancelled
func (*SQLiteStore) SaveOffset ¶
func (s *SQLiteStore) SaveOffset(ctx context.Context, subscriptionID string, offset eventbus.Offset) error
SaveOffset implements eventbus.SubscriptionStore