Documentation
¶
Index ¶
- Constants
- Variables
- func ApplySnapshot(ag Aggregate, event Event) error
- func EventDataToMap(data any) (map[string]any, error)
- func HasValidID(agg Aggregate) bool
- func NameToTopicUnicode(obj any) string
- func ObjectName(object any) string
- func ObjectPath(object any) string
- func SetEngine(parent context.Context, engine *Engine) context.Context
- func SetUserInfoFunc(fnc func(context.Context) UserInfo)
- func ShouldSnapshot(oldVersion, newVersion int) bool
- type Aggregate
- type AggregateBase
- func (ab *AggregateBase) AppendChanges(changes ...Event)
- func (ab *AggregateBase) Changes() Events
- func (ab *AggregateBase) ClearChanges()
- func (ab *AggregateBase) ProcessChanges(ctx context.Context, ag Aggregate) error
- func (ab *AggregateBase) Record(data ...any)
- func (ab *AggregateBase) RecordWithMetadata(data any, metadata map[string]any)
- func (a *AggregateBase) Replay(agg Aggregate, events []Event) error
- func (ab *AggregateBase) ReplayOne(agg Aggregate, event Event) error
- func (ab *AggregateBase) SetChanges(changes Events)
- func (ab *AggregateBase) SetVersion(version int64)
- func (ab *AggregateBase) Version() int64
- type AggregateDefinition
- type AggregateProjection
- type AggregateRegistry
- func (r *AggregateRegistry) Clear()
- func (r *AggregateRegistry) Get(aggName string) (Aggregate, error)
- func (r *AggregateRegistry) GetAggregate(aggName string) (Aggregate, bool)
- func (r *AggregateRegistry) GetEventCodec(aggName, evtName string) *EventCodec
- func (r *AggregateRegistry) List() iter.Seq[*AggregateRegistryItem]
- func (r *AggregateRegistry) Register(agg Aggregate, eventSamples []any) *AggregateRegistry
- type AggregateRegistryItem
- type AggregateSnapshotted
- type Applier
- type Command
- type CommandRollback
- type CommandValidator
- type Engine
- func (eng *Engine) Aggregates() *AggregateRegistry
- func (eng *Engine) CommitAggregateChanges(ctx context.Context, agg Aggregate) error
- func (eng *Engine) Execute(ctx context.Context, agg Aggregate, cmd Command) (err error)
- func (eng *Engine) IsRunning() bool
- func (eng *Engine) Load(ctx context.Context, agg Aggregate) error
- func (eng *Engine) LoadEventTypes(ctx context.Context, batchSize int, eventTypes ...any) (events []Event, err error)
- func (eng *Engine) LoadEvents(ctx context.Context, batchSize int, handle func(events []Event) error, ...) error
- func (eng *Engine) Projections() *ProjectionManager
- func (eng *Engine) RebuildProjection(ctx context.Context, projection any) error
- func (eng *Engine) RegisterAggregate(agg Aggregate, events []any, opts ...Option) error
- func (eng *Engine) RegisterProjection(proj Projection) error
- func (eng *Engine) Replay(ctx context.Context, agg Aggregate) error
- func (eng *Engine) RunProjectionOnce(ctx context.Context, projection any) error
- func (eng *Engine) RunProjectionToEnd(ctx context.Context, projection any) error
- func (eng *Engine) Send(ctx context.Context, events ...Event)
- func (eng *Engine) Start()
- func (eng *Engine) Stop()
- func (eng *Engine) Store() EventStore
- func (eng *Engine) WithStream(streams ...StreamPublisher) *Engine
- type Event
- type EventCodec
- type EventFilter
- type EventKind
- type EventProjection
- type EventState
- type EventStore
- type Events
- type EventsourcePlugin
- func (p *EventsourcePlugin) Commands() []*cobra.Command
- func (p *EventsourcePlugin) Configure(configure func(*EventsourcePlugin)) *EventsourcePlugin
- func (p *EventsourcePlugin) Deinitialize(app *golly.Application) error
- func (p *EventsourcePlugin) Engine() *Engine
- func (p *EventsourcePlugin) Initialize(app *golly.Application) error
- func (p *EventsourcePlugin) Name() string
- type HandlerCache
- type IDdProjection
- type InMemoryEvent
- type InMemoryStore
- func (s *InMemoryStore) DeleteEvent(ctx context.Context, eventID uuid.UUID) error
- func (s *InMemoryStore) Exists(ctx context.Context, eventID uuid.UUID) (bool, error)
- func (s *InMemoryStore) GlobalVersion(ctx context.Context) (int64, error)
- func (s *InMemoryStore) IncrementGlobalVersion(ctx context.Context) (int64, error)
- func (s *InMemoryStore) IsNewEvent(event Event) bool
- func (s *InMemoryStore) LoadEvents(ctx context.Context, filters ...EventFilter) ([]PersistedEvent, error)
- func (s *InMemoryStore) LoadEventsInBatches(ctx context.Context, batchSize int, handler func([]PersistedEvent) error, ...) error
- func (s *InMemoryStore) LoadSnapshot(ctx context.Context, aggregateType, aggregateID string) (PersistedEvent, error)
- func (s *InMemoryStore) Save(ctx context.Context, events ...*Event) error
- func (s *InMemoryStore) SaveSnapshot(ctx context.Context, agg Aggregate) error
- type InternalStream
- func (s *InternalStream) Name() string
- func (s *InternalStream) Publish(ctx context.Context, topic string, evt any) error
- func (s *InternalStream) Start()
- func (s *InternalStream) Stop()
- func (s *InternalStream) Subscribe(topic string, handler StreamHandler)
- func (s *InternalStream) Unsubscribe(topic string, handler StreamHandler)
- type Job
- type NewRecordChecker
- type Option
- type Options
- type PersistedEvent
- type PluginOption
- type PluginOptions
- type Projection
- type ProjectionBase
- type ProjectionHandler
- type ProjectionManager
- func (pm *ProjectionManager) Get(projID string) (Projection, error)
- func (pm *ProjectionManager) List() iter.Seq2[string, Projection]
- func (pm *ProjectionManager) Rebuild(ctx context.Context, eng *Engine, projID string) error
- func (pm *ProjectionManager) Register(projs ...Projection)
- func (pm *ProjectionManager) RunToEnd(ctx context.Context, eng *Engine, projID string) error
- func (pm *ProjectionManager) Start()
- func (pm *ProjectionManager) Stop()
- type Publishable
- type Stream
- type StreamHandler
- type StreamLifecycle
- type StreamManager
- type StreamNamed
- type StreamOptions
- type StreamPublisher
- type StreamSubscriber
- type TestEngineOptions
- type TestEventData
- type TopicsProjection
- type UserInfo
Constants ¶
const (
AllEvents = "*"
)
const (
DefaultStreamName = "default"
)
const (
PluginName = "eventsource"
)
const (
SnapShotIncrement = 100
)
Variables ¶
var ( ErrorRepositoryIsNil = fmt.Errorf("eventstore is nil for aggregate") ErrorAggregateNotFound = fmt.Errorf("aggregate is not found in registry") ErrorNoEventsFound = fmt.Errorf("no events found matching this aggregation") ErrorNoAggregateID = fmt.Errorf("no aggregate id was defined after processing events (no such stream)") ErrorAggregateNotInitialized = fmt.Errorf("aggregate was not created properly and IsNewRecord is still true after events") )
var ErrQueueDraining = errors.New("queue is draining and not accepting new events")
var (
ErrVersionConflict = errors.New("version conflict")
)
Functions ¶
func ApplySnapshot ¶
func HasValidID ¶
func NameToTopicUnicode ¶
func ObjectName ¶
func ObjectPath ¶
func SetUserInfoFunc ¶
func ShouldSnapshot ¶
ShouldSnapshot determines if a snapshot should be created based on version increments.
Types ¶
type Aggregate ¶
type Aggregate interface {
// Record events to applied later with metadata
Record(...any)
// Record events to be applied later with metadata
RecordWithMetadata(any, map[string]any)
// Process the events into the aggregation
ProcessChanges(context.Context, Aggregate) error
// Replay events
Replay(Aggregate, []Event) error
ReplayOne(Aggregate, Event) error
// Get the ID of the aggregate it is a string so it supports
// both UUID and INT representations
GetID() string
AppendChanges(...Event)
SetChanges(Events)
ClearChanges()
Changes() Events
Version() int64
SetVersion(int64)
}
type AggregateBase ¶
type AggregateBase struct {
AggregateVersion int64 `json:"version" gorm:"column:version"`
// contains filtered or unexported fields
}
func (*AggregateBase) AppendChanges ¶
func (ab *AggregateBase) AppendChanges(changes ...Event)
func (*AggregateBase) Changes ¶
func (ab *AggregateBase) Changes() Events
func (*AggregateBase) ClearChanges ¶
func (ab *AggregateBase) ClearChanges()
func (*AggregateBase) ProcessChanges ¶
func (ab *AggregateBase) ProcessChanges(ctx context.Context, ag Aggregate) error
ProcessChanges applies all uncommitted changes to the aggregate. Each change is processed if it does not have the READY state set. Changes are updated to reflect their applied state and reattached to the aggregate.
func (*AggregateBase) Record ¶
func (ab *AggregateBase) Record(data ...any)
Record generates and tracks events for the aggregate, incrementing the version for each event. Events are stored as uncommitted changes, ready for processing by the handler.
func (*AggregateBase) RecordWithMetadata ¶
func (ab *AggregateBase) RecordWithMetadata(data any, metadata map[string]any)
Record generates and tracks events for the aggregate, incrementing the version for each event. Events are stored as uncommitted changes, ready for processing by the handler. Allows for additional metadata to be added to the event
func (*AggregateBase) Replay ¶
func (a *AggregateBase) Replay(agg Aggregate, events []Event) error
Replay applies events in the correct order to rebuild aggregate state.
func (*AggregateBase) ReplayOne ¶
func (ab *AggregateBase) ReplayOne(agg Aggregate, event Event) error
func (*AggregateBase) SetChanges ¶
func (ab *AggregateBase) SetChanges(changes Events)
func (*AggregateBase) SetVersion ¶
func (ab *AggregateBase) SetVersion(version int64)
func (*AggregateBase) Version ¶
func (ab *AggregateBase) Version() int64
type AggregateDefinition ¶
type AggregateProjection ¶
type AggregateProjection interface {
AggregateTypes() []any
}
type AggregateRegistry ¶
type AggregateRegistry struct {
// contains filtered or unexported fields
}
AggregateRegistry manages mappings of aggregate names to registry items. Internally, it uses a standard map guarded by a sync.RWMutex.
func NewAggregateRegistry ¶
func NewAggregateRegistry() *AggregateRegistry
NewAggregateRegistry initializes an empty registry with a normal map.
func (*AggregateRegistry) Clear ¶
func (r *AggregateRegistry) Clear()
Clear removes all aggregates and event codecs from the registry (helpful in tests).
func (*AggregateRegistry) Get ¶
func (r *AggregateRegistry) Get(aggName string) (Aggregate, error)
Get provides a compatibility helper returning an aggregate or error.
func (*AggregateRegistry) GetAggregate ¶
func (r *AggregateRegistry) GetAggregate(aggName string) (Aggregate, bool)
GetAggregate instantiates a new aggregate for the given name (e.g., "MyAggregate").
func (*AggregateRegistry) GetEventCodec ¶
func (r *AggregateRegistry) GetEventCodec(aggName, evtName string) *EventCodec
GetEventCodec retrieves the EventCodec for (aggregateName, eventName).
func (*AggregateRegistry) List ¶
func (r *AggregateRegistry) List() iter.Seq[*AggregateRegistryItem]
List returns an iterator over all registered aggregate types. This can be used with Go 1.23+ range-over-func:
for agg := range registry.List() {
// use agg
}
func (*AggregateRegistry) Register ¶
func (r *AggregateRegistry) Register(agg Aggregate, eventSamples []any) *AggregateRegistry
Register takes an aggregate (e.g., &MyAggregate{}) and a slice of event “samples”. For each event sample, it precomputes an EventCodec and stores it in the registry.
type AggregateRegistryItem ¶
type AggregateRegistryItem struct {
AggregateType reflect.Type
Events map[string]*EventCodec
}
AggregateRegistryItem represents a single aggregate type (e.g., *Order) and its associated event codecs (e.g., "OrderCreated" -> {marshal/unmarshal}).
type AggregateSnapshotted ¶
type AggregateSnapshotted struct {
State []byte
}
AggregateSnapshottedEvent represents a snapshot of the aggregate's state.
type CommandRollback ¶
type CommandValidator ¶
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine is the main entry point for your event-sourced system. It contains:
- An EventStore for persistence
- A ProjectionManager for building read models
- A StreamManager for all event publishing (internal projections and external producers)
func DefaultEngine ¶
func DefaultEngine() *Engine
func FromApp ¶
func FromApp(app *golly.Application) *Engine
func NewInMemoryEngine ¶
func NewInMemoryEngine(opts TestEngineOptions) *Engine
NewInMemoryEngine creates a new in-memory engine with the given options. for testing purposes.
func (*Engine) Aggregates ¶
func (eng *Engine) Aggregates() *AggregateRegistry
Aggregates returns the underlying aggregate registry
func (*Engine) CommitAggregateChanges ¶
CommitAggregateChanges applies a series of new events from an aggregate:
- Increments a global version.
- Saves the events to the event store.
- Publishes them to the bus.
- Marks the aggregate's changes as complete.
func (*Engine) Execute ¶
Execute handles command execution, including loading the aggregate, replaying events, validating, and persisting changes.
func (*Engine) LoadEventTypes ¶
func (*Engine) LoadEvents ¶
func (*Engine) Projections ¶
func (eng *Engine) Projections() *ProjectionManager
Projections returns the underlying projection manager
func (*Engine) RebuildProjection ¶
RebuildProjection rebuilds a single projection
func (*Engine) RegisterAggregate ¶
func (*Engine) RegisterProjection ¶
func (eng *Engine) RegisterProjection(proj Projection) error
RegisterProjection registers a projection to the stream manager RegisterProjection registers a projection with the projection manager
func (*Engine) RunProjectionOnce ¶
RunProjectionOnce runs the projection once
func (*Engine) RunProjectionToEnd ¶
RunProjectionToEnd runs the projection to the end of the event stream
func (*Engine) Send ¶
Send publishes events to all registered streams (internal projections and external producers)
func (*Engine) Start ¶
func (eng *Engine) Start()
Start marks the engine running and starts projections and streams
func (*Engine) Stop ¶
func (eng *Engine) Stop()
Stop stops the engine, gracefully draining all projection events
func (*Engine) Store ¶
func (eng *Engine) Store() EventStore
Store returns the underlying event store (if you need direct access)
func (*Engine) WithStream ¶
func (eng *Engine) WithStream(streams ...StreamPublisher) *Engine
WithStream adds streams for event publishing (internal or external)
type Event ¶
type Event struct {
ID uuid.UUID `json:"id"`
Kind EventKind `json:"kind"`
CreatedAt time.Time `json:"createdAt"`
Type string `json:"eventType" gorm:"index:idx_events_type"`
AggregateID string `json:"aggregateId" gorm:"index:idx_events_aggregate"`
AggregateType string `json:"aggregateType" gorm:"index:idx_events_aggregate"`
Version int64 `json:"version"`
GlobalVersion int64 `json:"globalVersion"`
State EventState `json:"state,omitempty" gorm:"-"`
Data interface{} `json:"data" gorm:"-"`
TenantID string `json:"tenantID"`
UserID string `json:"userID"`
Topic string `json:"topic"`
}
Event represents a single event in the system.
func NewEvent ¶
func NewEvent(data any, state EventState) Event
NewEvent creates a new Event with the provided data and aggregate information.
func NewSnapshot ¶
NewSnapshot creates a snapshot event by wrapping the aggregate as the Data field.
func (*Event) GetState ¶
func (e *Event) GetState() EventState
GetState returns the current state of the event.
func (*Event) InState ¶
func (e *Event) InState(s EventState) bool
HasState checks if the event has a specific state.
func (*Event) SetState ¶
func (e *Event) SetState(s EventState)
SetState adds a state to the event if it doesn't exist.
type EventCodec ¶
EventCodec holds closures for marshalling/unmarshalling a particular event type.
type EventFilter ¶
type EventFilter struct {
AggregateType string // Filter by aggregate type (e.g., "Order", "User")
AggregateID string // Filter by a specific aggregate ID
AggregateTypes []string // Filter by multiple aggregate types
EventType []string // Filter by event type (e.g., "OrderCreated", "UserUpdated")
Topics []string // Filter by topics (e.g., "order", "user")
FromVersion int // Minimum event version to load
ToVersion int // Maximum event version to load
FromGlobalVersion int // Minimum global version to load
ToGlobalVersion int // Maximum global version to load
FromTime time.Time // Events created after this time
ToTime time.Time // Events created before this time
Limit int // Maximum number of events to return
}
EventFilter represents criteria for filtering events during retrieval.
type EventProjection ¶
type EventProjection interface {
EventTypes() []any
}
type EventState ¶
type EventState string
EventState represents the state of an event using bitwise flags.
const ( EventStateReady EventState = "ready" EventStateApplied EventState = "applied" EventStateFailed EventState = "failed" EventStateCompleted EventState = "completed" EventStateRetry EventState = "retry" EventStateCanceled EventState = "canceled" )
type EventStore ¶
type EventStore interface {
// Save persists one or more events to the event store.
// Events are stored atomically to ensure consistency.
Save(ctx context.Context, events ...*Event) error
// LoadEvents retrieves all events across different aggregates.
// This can be useful for projections or rebuilding read models. (This is expected to return in order of GlobalVersion)
LoadEvents(ctx context.Context, filters ...EventFilter) ([]PersistedEvent, error)
// LoadEventsInBatches loads the events in batches of batch size calling handler for each batch
// if handler returns error processing is stopped (These is expected to return in order of GlobalVersion)
LoadEventsInBatches(ctx context.Context, batchSize int, handler func([]PersistedEvent) error, filters ...EventFilter) error
// IsNewEvent checks if an event is new by inspecting its metadata (e.g., ID, version).
IsNewEvent(event Event) bool
// Exists checks if an event exists by its ID or unique key.
Exists(ctx context.Context, eventID uuid.UUID) (bool, error)
// DeleteEvent removes an event by ID. Useful for GDPR or soft deletes.
DeleteEvent(ctx context.Context, eventID uuid.UUID) error
// AggregateSnapshot persists a snapshot of an aggregate state.
SaveSnapshot(ctx context.Context, snapshot Aggregate) error
// LoadSnapshot retrieves the latest snapshot of an aggregate for faster loading.
LoadSnapshot(ctx context.Context, aggregateType, aggregateID string) (PersistedEvent, error)
IncrementGlobalVersion(ctx context.Context) (int64, error)
}
EventStore is an interface for managing event persistence and retrieval.
type Events ¶
type Events []Event
func (Events) ByState ¶
func (evts Events) ByState(state EventState) Events
func (Events) MarkComplete ¶
func (Events) MarkFailed ¶
func (Events) Uncommitted ¶
type EventsourcePlugin ¶
type EventsourcePlugin struct {
// contains filtered or unexported fields
}
Plugin implements the Plugin interface for the eventsource
func NewPlugin ¶
func NewPlugin(opts ...PluginOption) *EventsourcePlugin
NewPlugin creates a new Plugin with the given store
func Plugin ¶
func Plugin() *EventsourcePlugin
Plugin returns the eventsource plugin from the application
func (*EventsourcePlugin) Commands ¶
func (p *EventsourcePlugin) Commands() []*cobra.Command
Commands returns the list of CLI commands provided by the plugin
func (*EventsourcePlugin) Configure ¶
func (p *EventsourcePlugin) Configure(configure func(*EventsourcePlugin)) *EventsourcePlugin
ConfigureEngine allows additional configuration or usage of the engine
func (*EventsourcePlugin) Deinitialize ¶
func (p *EventsourcePlugin) Deinitialize(app *golly.Application) error
Deinitialize stops the engine
func (*EventsourcePlugin) Engine ¶
func (p *EventsourcePlugin) Engine() *Engine
Engine returns the initialized engine
func (*EventsourcePlugin) Initialize ¶
func (p *EventsourcePlugin) Initialize(app *golly.Application) error
Initialize sets up the engine and starts it
func (*EventsourcePlugin) Name ¶
func (p *EventsourcePlugin) Name() string
Name returns the name of the plugin
type HandlerCache ¶
type HandlerCache struct {
// contains filtered or unexported fields
}
func NewHandlerCache ¶
func NewHandlerCache() *HandlerCache
func (*HandlerCache) Reset ¶
func (hc *HandlerCache) Reset()
type IDdProjection ¶
type IDdProjection interface {
ID() string
}
type InMemoryEvent ¶
type InMemoryEvent struct {
Event
}
InMemoryEvent wraps an Event and implements the PersistedEvent interface. It can be as simple as returning the embedded Event on Hydrate(...).
func (InMemoryEvent) GlobalVersion ¶
func (t InMemoryEvent) GlobalVersion() int64
type InMemoryStore ¶
type InMemoryStore struct {
SaveFail error
// contains filtered or unexported fields
}
InMemoryStore is an in-memory implementation of EventStore for testing purposes.
func NewInMemoryStore ¶
func NewInMemoryStore(data ...Event) *InMemoryStore
func (*InMemoryStore) DeleteEvent ¶
DeleteEvent removes an event by ID.
func (*InMemoryStore) GlobalVersion ¶
func (s *InMemoryStore) GlobalVersion(ctx context.Context) (int64, error)
GlobalVersion returns the highest global version in this test store.
func (*InMemoryStore) IncrementGlobalVersion ¶
func (s *InMemoryStore) IncrementGlobalVersion(ctx context.Context) (int64, error)
func (*InMemoryStore) IsNewEvent ¶
func (s *InMemoryStore) IsNewEvent(event Event) bool
IsNewEvent checks if an event is new by aggregate ID and version.
func (*InMemoryStore) LoadEvents ¶
func (s *InMemoryStore) LoadEvents(ctx context.Context, filters ...EventFilter) ([]PersistedEvent, error)
LoadEvents retrieves all events (optionally filtered) as []PersistedEvent.
func (*InMemoryStore) LoadEventsInBatches ¶
func (s *InMemoryStore) LoadEventsInBatches( ctx context.Context, batchSize int, handler func([]PersistedEvent) error, filters ...EventFilter, ) error
LoadEventsInBatches loads events in ascending global version order in batches.
func (*InMemoryStore) LoadSnapshot ¶
func (s *InMemoryStore) LoadSnapshot(ctx context.Context, aggregateType, aggregateID string) (PersistedEvent, error)
LoadSnapshot retrieves the latest snapshot for an aggregate as a PersistedEvent.
func (*InMemoryStore) Save ¶
func (s *InMemoryStore) Save(ctx context.Context, events ...*Event) error
Save persists one or more events atomically in memory.
func (*InMemoryStore) SaveSnapshot ¶
func (s *InMemoryStore) SaveSnapshot(ctx context.Context, agg Aggregate) error
SaveSnapshot persists a snapshot of an aggregate in memory.
type InternalStream ¶
type InternalStream struct {
// contains filtered or unexported fields
}
InternalStream is a simple stream for projections and in-memory subscriptions No complex ordering - just process events as they arrive
func NewInternalStream ¶
func NewInternalStream(name string) *InternalStream
NewInternalStream creates a simple internal stream for projections
func (*InternalStream) Subscribe ¶
func (s *InternalStream) Subscribe(topic string, handler StreamHandler)
Subscribe registers a handler for a topic
func (*InternalStream) Unsubscribe ¶
func (s *InternalStream) Unsubscribe(topic string, handler StreamHandler)
Unsubscribe removes a handler for a topic
type NewRecordChecker ¶
type NewRecordChecker interface {
IsNewRecord() bool
}
type Option ¶
type Option func(*Options)
Option is a function that configures a projection registration or other engine-related setup.
func WithStore ¶
func WithStore(store EventStore) Option
WithStore configures the Engine to use the provided EventStore
func WithStreamBlockedTimeout ¶
deprecated: bus defined options
func WithStreamBufferSize ¶
deprecated: bus defined options
func WithStreamPartitions ¶
deprecated: bus defined options
func WithStreams ¶
func WithStreams(streams ...StreamPublisher) Option
type Options ¶
type Options struct {
Store EventStore
Stream *StreamOptions
Streams []StreamPublisher
}
Options holds all possible configuration parameters that can be adjusted via Option functions.
type PersistedEvent ¶
type PluginOption ¶
type PluginOption func(*PluginOptions)
func PluginWithEngine ¶
func PluginWithEngine(engine *Engine) PluginOption
func PluginWithStore ¶
func PluginWithStore(store EventStore) PluginOption
func PluginWithStreams ¶
func PluginWithStreams(streams ...StreamPublisher) PluginOption
func PluginWithUserInfoFunc ¶
func PluginWithUserInfoFunc(fnc func(context.Context) UserInfo) PluginOption
type PluginOptions ¶
type PluginOptions struct {
// contains filtered or unexported fields
}
type Projection ¶
type ProjectionBase ¶
type ProjectionBase struct {
// contains filtered or unexported fields
}
ProjectionBase is an embeddable helper for common Projection logic.
func (*ProjectionBase) Position ¶
func (p *ProjectionBase) Position(context.Context) int64
Position/SetPosition with atomic
func (*ProjectionBase) Reset ¶
func (p *ProjectionBase) Reset(ctx context.Context) error
Reset sets position to -1
func (*ProjectionBase) SetPosition ¶
func (p *ProjectionBase) SetPosition(ctx context.Context, pos int64) error
type ProjectionManager ¶
type ProjectionManager struct {
// contains filtered or unexported fields
}
ProjectionManager manages multiple projections, each identified by a key.
func NewProjectionManager ¶
func NewProjectionManager() *ProjectionManager
NewProjectionManager creates a ProjectionManager with an empty registry.
func (*ProjectionManager) Get ¶
func (pm *ProjectionManager) Get(projID string) (Projection, error)
Get returns a projection by ID. Returns an error if the projection is not found. Returns nil if the projection is found. Returns nil if the projection is found.
func (*ProjectionManager) List ¶
func (pm *ProjectionManager) List() iter.Seq2[string, Projection]
List returns an iterator over all registered projections with their IDs. Snapshot is taken when List() is called. Iteration order is deterministic (sorted by ID). Compatible with Go 1.23+ range-over-func using iter.Seq2.
for id, proj := range manager.List() {
fmt.Printf("Projection %s: %v\n", id, proj)
}
func (*ProjectionManager) Rebuild ¶
Rebuild resets a single projection, then processes all events from version 0 upward.
func (*ProjectionManager) Register ¶
func (pm *ProjectionManager) Register(projs ...Projection)
Register adds one or more projections to the manager and indexes them by their filters.
func (*ProjectionManager) RunToEnd ¶
RunToEnd catches up a single projection from its current position to the end of the stream.
func (*ProjectionManager) Start ¶
func (pm *ProjectionManager) Start()
Start begins async processing of projection events
func (*ProjectionManager) Stop ¶
func (pm *ProjectionManager) Stop()
Stop gracefully shuts down projection processing, draining all in-flight events
type Publishable ¶
type Publishable interface {
Topic() string
}
type Stream ¶
type Stream struct {
*InternalStream
}
Stream represents a single in-memory event stream with subscriptions. This is now just a wrapper around InternalStream for backward compatibility.
func NewStream ¶
func NewStream(opts StreamOptions) *Stream
NewStream initializes a new Stream with options
type StreamHandler ¶
type StreamLifecycle ¶
type StreamLifecycle interface {
Start()
Stop()
}
StreamLifecycle allows external lifecycle control; optional.
type StreamManager ¶
type StreamManager struct {
// contains filtered or unexported fields
}
StreamManager manages streams and coordinates dispatch.
func NewStreamManager ¶
func NewStreamManager() *StreamManager
NewStreamManager initializes a new StreamManager.
func (*StreamManager) Add ¶
func (sm *StreamManager) Add(streams ...StreamPublisher)
Add registers streams for publish fanout.
func (*StreamManager) Publish ¶
func (sm *StreamManager) Publish(ctx context.Context, topic string, events ...Event)
Publish publishes events to all streams.
func (*StreamManager) Start ¶
func (sm *StreamManager) Start()
Start starts streams that implement lifecycle.
func (*StreamManager) Stop ¶
func (sm *StreamManager) Stop()
Stop stops streams that implement lifecycle.
func (*StreamManager) Subscribe ¶
func (sm *StreamManager) Subscribe(topic string, handler StreamHandler) bool
Subscribe subscribes the handler to the first subscribable stream.
type StreamNamed ¶
type StreamNamed interface{ Name() string }
StreamNamed provides a debug/metrics-friendly name for a stream.
type StreamOptions ¶
type StreamPublisher ¶
StreamPublisher can publish an event to a topic on this stream (in-memory or outbound).
type StreamSubscriber ¶
type StreamSubscriber interface {
Subscribe(topic string, handler StreamHandler)
}
StreamSubscriber can subscribe a handler to receive events for a topic ("*" for all).
type TestEngineOptions ¶
type TestEngineOptions struct {
Aggregates []AggregateDefinition
Projections []Projection
Data []any
Streams []StreamPublisher
UserInfoFunc func(context.Context) UserInfo
}
type TestEventData ¶
type TopicsProjection ¶
type TopicsProjection interface {
Topics() []string
}
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
eventstores
|
|
|
gorm
module
|
|
|
gormstore
module
|
|
|
examples
|
|
|
aggregates
command
|
|
|
basic
command
|
|
|
full
command
|
|
|
middleware
command
|
|
|
projections
command
|
|
|
repositories
|
|
|
mongo
module
|
|
|
vector
module
|