eventsource

package module
v0.0.0-...-25f5ff4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2026 License: MIT Imports: 17 Imported by: 17

README

Event Sourcing with Golang

This project provides a robust event-sourcing framework for Golang, designed for building scalable, event-driven applications. It enables the use of aggregates, streams, and event handlers to ensure consistent and traceable state changes.

Overview

Event sourcing is a pattern that records changes in application state as immutable events. This approach provides several advantages:

  • Auditability: Every state change is stored as an immutable event.
  • Reconstruction: System state can be rebuilt by replaying historical events.
  • Flexibility: The same event stream can power multiple projections or views.
  • Concurrency: Supports concurrent event processing.
  • Scalability: Uses partitioned streams for high-throughput event handling.

Features

  • Aggregate Management: Type-safe aggregates with automatic event application.
  • Stream Processing: Configurable event streams with partitioning support.
  • Projections: Supports real-time and rebuildable read models.
  • Event Store: Pluggable storage backends (including PostgreSQL and in-memory storage).
  • Command Handling: Structured command processing with validation.
  • Snapshots: Automatic aggregate state snapshots for improved performance.

Installation

Install the package using:

go get github.com/golly-go/plugins/eventsource

Quick Start

1. Define Your Domain Events

Define the domain events representing state changes:

type OrderCreated struct {
    ID         string
    CustomerID string
    Amount     float64
}

type OrderStatusChanged struct {
    ID     string
    Status string
}
2. Create an Aggregate

Define an aggregate that maintains state and applies events:

type Order struct {
    eventsource.AggregateBase
    ID      string
    Status  string
    Amount  float64
}

func (o *Order) GetID() string {
    return o.ID
}

// Automatically applies event handlers
func (o *Order) ApplyOrderCreated(event OrderCreated) {
    o.ID = event.ID
    o.Amount = event.Amount
}

func (o *Order) ApplyOrderStatusChanged(event OrderStatusChanged) {
    o.Status = event.Status
}
3. Set Up the Engine

Initialize the event-sourcing engine and register aggregates:

// Initialize with a preferred store
engine := eventsource.NewEngine(&eventsource.InMemoryStore{})

// Register aggregates and their events
engine.RegisterAggregate(&Order{}, []any{
    OrderCreated{},
    OrderStatusChanged{},
})

// Start processing
engine.Start()
defer engine.Stop()
4. Create a Projection

Projections allow building read models from events:

type OrderSummary struct {
    eventsource.ProjectionBase
    mu          sync.RWMutex
    TotalOrders int
    TotalAmount float64
}

func (o *OrderSummary) HandleEvent(ctx *golly.Context, evt eventsource.Event) error {
    o.mu.Lock()
    defer o.mu.Unlock()

    switch e := evt.Data.(type) {
    case OrderCreated:
        o.TotalOrders++
        o.TotalAmount += e.Amount
    }
    return nil
}

// Register projection
summary := &OrderSummary{}
engine.RegisterProjection(summary,
    eventsource.WithStream("orders", true, 4))
5. Execute Commands

Commands trigger state changes within aggregates:

type CreateOrder struct {
    ID     string
    Amount float64
}

func (c CreateOrder) Perform(ctx *golly.Context, agg eventsource.Aggregate) error {
    order := agg.(*Order)
    order.Record(OrderCreated{
        ID:     c.ID,
        Amount: c.Amount,
    })
    return nil
}

// Execute the command
order := &Order{}
err := engine.Execute(ctx, order, CreateOrder{
    ID:     "order_1",
    Amount: 99.99,
})

Advanced Features

Event Store Configuration

The library supports multiple event store implementations:

// PostgreSQL Store
store := &gormstore.Store{
    DB: db, // Your GORM DB instance
}

// In-Memory Store (ideal for testing)
store := &eventsource.InMemoryStore{}

engine := eventsource.NewEngine(store)
Stream Partitioning

Configure streams with custom partitioning for improved scalability:

engine.RegisterProjection(projection,
    eventsource.WithStream("orders", true, 8),  // 8 partitions
    eventsource.WithStreamBufferSize(1000))
Snapshots

Enable automatic snapshots to speed up aggregate loading:

// Set snapshot frequency
engine.SetSnapshotFrequency(100) // Every 100 events

// Load aggregate using latest snapshot
agg, err := engine.Load(ctx, "Order", "order_123")

Testing

The library includes an in-memory store ideal for testing:

func TestOrderCreation(t *testing.T) {
    engine := eventsource.NewEngine(&eventsource.InMemoryStore{})
    order := &Order{}
    
    err := engine.Execute(ctx, order, CreateOrder{
        ID:     "test_1",
        Amount: 100,
    })
    
    assert.NoError(t, err)
    assert.Equal(t, 100.0, order.Amount)
}

Contributing

Contributions are welcome! Please submit a Pull Request.

License

This project is licensed under the MIT License. See the LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	AllEvents = "*"
)
View Source
const (
	DefaultStreamName = "default"
)
View Source
const (
	PluginName = "eventsource"
)
View Source
const (
	SnapShotIncrement = 100
)

Variables

View Source
var (
	ErrorRepositoryIsNil         = fmt.Errorf("eventstore is nil for aggregate")
	ErrorAggregateNotFound       = fmt.Errorf("aggregate is not found in registry")
	ErrorNoEventsFound           = fmt.Errorf("no events found matching this aggregation")
	ErrorNoAggregateID           = fmt.Errorf("no aggregate id was defined after processing events (no such stream)")
	ErrorAggregateNotInitialized = fmt.Errorf("aggregate was not created properly and IsNewRecord is still true after events")
)
View Source
var ErrQueueDraining = errors.New("queue is draining and not accepting new events")
View Source
var (
	ErrVersionConflict = errors.New("version conflict")
)

Functions

func ApplySnapshot

func ApplySnapshot(ag Aggregate, event Event) error

func EventDataToMap

func EventDataToMap(data any) (map[string]any, error)

func HasValidID

func HasValidID(agg Aggregate) bool

func NameToTopicUnicode

func NameToTopicUnicode(obj any) string

func ObjectName

func ObjectName(object any) string

func ObjectPath

func ObjectPath(object any) string

func SetEngine

func SetEngine(parent context.Context, engine *Engine) context.Context

func SetUserInfoFunc

func SetUserInfoFunc(fnc func(context.Context) UserInfo)

func ShouldSnapshot

func ShouldSnapshot(oldVersion, newVersion int) bool

ShouldSnapshot determines if a snapshot should be created based on version increments.

Types

type Aggregate

type Aggregate interface {
	// Record events to applied later with metadata
	Record(...any)

	// Record events to be applied later with metadata
	RecordWithMetadata(any, map[string]any)

	// Process the events into the aggregation
	ProcessChanges(context.Context, Aggregate) error

	// Replay events
	Replay(Aggregate, []Event) error
	ReplayOne(Aggregate, Event) error

	// Get the ID of the aggregate it is a string so it supports
	// both UUID and INT representations
	GetID() string

	AppendChanges(...Event)
	SetChanges(Events)
	ClearChanges()

	Changes() Events

	Version() int64
	SetVersion(int64)
}

type AggregateBase

type AggregateBase struct {
	AggregateVersion int64 `json:"version" gorm:"column:version"`
	// contains filtered or unexported fields
}

func (*AggregateBase) AppendChanges

func (ab *AggregateBase) AppendChanges(changes ...Event)

func (*AggregateBase) Changes

func (ab *AggregateBase) Changes() Events

func (*AggregateBase) ClearChanges

func (ab *AggregateBase) ClearChanges()

func (*AggregateBase) ProcessChanges

func (ab *AggregateBase) ProcessChanges(ctx context.Context, ag Aggregate) error

ProcessChanges applies all uncommitted changes to the aggregate. Each change is processed if it does not have the READY state set. Changes are updated to reflect their applied state and reattached to the aggregate.

func (*AggregateBase) Record

func (ab *AggregateBase) Record(data ...any)

Record generates and tracks events for the aggregate, incrementing the version for each event. Events are stored as uncommitted changes, ready for processing by the handler.

func (*AggregateBase) RecordWithMetadata

func (ab *AggregateBase) RecordWithMetadata(data any, metadata map[string]any)

Record generates and tracks events for the aggregate, incrementing the version for each event. Events are stored as uncommitted changes, ready for processing by the handler. Allows for additional metadata to be added to the event

func (*AggregateBase) Replay

func (a *AggregateBase) Replay(agg Aggregate, events []Event) error

Replay applies events in the correct order to rebuild aggregate state.

func (*AggregateBase) ReplayOne

func (ab *AggregateBase) ReplayOne(agg Aggregate, event Event) error

func (*AggregateBase) SetChanges

func (ab *AggregateBase) SetChanges(changes Events)

func (*AggregateBase) SetVersion

func (ab *AggregateBase) SetVersion(version int64)

func (*AggregateBase) Version

func (ab *AggregateBase) Version() int64

type AggregateDefinition

type AggregateDefinition struct {
	Aggregate Aggregate
	Events    []any
}

type AggregateProjection

type AggregateProjection interface {
	AggregateTypes() []any
}

type AggregateRegistry

type AggregateRegistry struct {
	// contains filtered or unexported fields
}

AggregateRegistry manages mappings of aggregate names to registry items. Internally, it uses a standard map guarded by a sync.RWMutex.

func NewAggregateRegistry

func NewAggregateRegistry() *AggregateRegistry

NewAggregateRegistry initializes an empty registry with a normal map.

func (*AggregateRegistry) Clear

func (r *AggregateRegistry) Clear()

Clear removes all aggregates and event codecs from the registry (helpful in tests).

func (*AggregateRegistry) Get

func (r *AggregateRegistry) Get(aggName string) (Aggregate, error)

Get provides a compatibility helper returning an aggregate or error.

func (*AggregateRegistry) GetAggregate

func (r *AggregateRegistry) GetAggregate(aggName string) (Aggregate, bool)

GetAggregate instantiates a new aggregate for the given name (e.g., "MyAggregate").

func (*AggregateRegistry) GetEventCodec

func (r *AggregateRegistry) GetEventCodec(aggName, evtName string) *EventCodec

GetEventCodec retrieves the EventCodec for (aggregateName, eventName).

func (*AggregateRegistry) List

List returns an iterator over all registered aggregate types. This can be used with Go 1.23+ range-over-func:

for agg := range registry.List() {
    // use agg
}

func (*AggregateRegistry) Register

func (r *AggregateRegistry) Register(agg Aggregate, eventSamples []any) *AggregateRegistry

Register takes an aggregate (e.g., &MyAggregate{}) and a slice of event “samples”. For each event sample, it precomputes an EventCodec and stores it in the registry.

type AggregateRegistryItem

type AggregateRegistryItem struct {
	AggregateType reflect.Type
	Events        map[string]*EventCodec
}

AggregateRegistryItem represents a single aggregate type (e.g., *Order) and its associated event codecs (e.g., "OrderCreated" -> {marshal/unmarshal}).

type AggregateSnapshotted

type AggregateSnapshotted struct {
	State []byte
}

AggregateSnapshottedEvent represents a snapshot of the aggregate's state.

type Applier

type Applier interface {
	// Repo(golly.Context) Repository
	Apply(Event)
}

type Command

type Command interface {
	Perform(context.Context, Aggregate) error
}

type CommandRollback

type CommandRollback interface {
	Rollback(context.Context, Aggregate, error)
}

type CommandValidator

type CommandValidator interface {
	Validate(context.Context, Aggregate) error
}

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is the main entry point for your event-sourced system. It contains:

  • An EventStore for persistence
  • A ProjectionManager for building read models
  • A StreamManager for all event publishing (internal projections and external producers)

func DefaultEngine

func DefaultEngine() *Engine

func FromApp

func FromApp(app *golly.Application) *Engine

func GetEngine

func GetEngine(tracker any) *Engine

func NewEngine

func NewEngine(opts ...Option) *Engine

NewEngine allows configuring the engine via Option by building a config first.

func NewInMemoryEngine

func NewInMemoryEngine(opts TestEngineOptions) *Engine

NewInMemoryEngine creates a new in-memory engine with the given options. for testing purposes.

func (*Engine) Aggregates

func (eng *Engine) Aggregates() *AggregateRegistry

Aggregates returns the underlying aggregate registry

func (*Engine) CommitAggregateChanges

func (eng *Engine) CommitAggregateChanges(ctx context.Context, agg Aggregate) error

CommitAggregateChanges applies a series of new events from an aggregate:

  1. Increments a global version.
  2. Saves the events to the event store.
  3. Publishes them to the bus.
  4. Marks the aggregate's changes as complete.

func (*Engine) Execute

func (eng *Engine) Execute(ctx context.Context, agg Aggregate, cmd Command) (err error)

Execute handles command execution, including loading the aggregate, replaying events, validating, and persisting changes.

func (*Engine) IsRunning

func (eng *Engine) IsRunning() bool

func (*Engine) Load

func (eng *Engine) Load(ctx context.Context, agg Aggregate) error

func (*Engine) LoadEventTypes

func (eng *Engine) LoadEventTypes(ctx context.Context, batchSize int, eventTypes ...any) (events []Event, err error)

func (*Engine) LoadEvents

func (eng *Engine) LoadEvents(
	ctx context.Context,
	batchSize int,
	handle func(events []Event) error,
	filter ...EventFilter,
) error

func (*Engine) Projections

func (eng *Engine) Projections() *ProjectionManager

Projections returns the underlying projection manager

func (*Engine) RebuildProjection

func (eng *Engine) RebuildProjection(ctx context.Context, projection any) error

RebuildProjection rebuilds a single projection

func (*Engine) RegisterAggregate

func (eng *Engine) RegisterAggregate(agg Aggregate, events []any, opts ...Option) error

func (*Engine) RegisterProjection

func (eng *Engine) RegisterProjection(proj Projection) error

RegisterProjection registers a projection to the stream manager RegisterProjection registers a projection with the projection manager

func (*Engine) Replay

func (eng *Engine) Replay(ctx context.Context, agg Aggregate) error

func (*Engine) RunProjectionOnce

func (eng *Engine) RunProjectionOnce(ctx context.Context, projection any) error

RunProjectionOnce runs the projection once

func (*Engine) RunProjectionToEnd

func (eng *Engine) RunProjectionToEnd(ctx context.Context, projection any) error

RunProjectionToEnd runs the projection to the end of the event stream

func (*Engine) Send

func (eng *Engine) Send(ctx context.Context, events ...Event)

Send publishes events to all registered streams (internal projections and external producers)

func (*Engine) Start

func (eng *Engine) Start()

Start marks the engine running and starts projections and streams

func (*Engine) Stop

func (eng *Engine) Stop()

Stop stops the engine, gracefully draining all projection events

func (*Engine) Store

func (eng *Engine) Store() EventStore

Store returns the underlying event store (if you need direct access)

func (*Engine) WithStream

func (eng *Engine) WithStream(streams ...StreamPublisher) *Engine

WithStream adds streams for event publishing (internal or external)

type Event

type Event struct {
	ID            uuid.UUID   `json:"id"`
	Kind          EventKind   `json:"kind"`
	CreatedAt     time.Time   `json:"createdAt"`
	Type          string      `json:"eventType" gorm:"index:idx_events_type"`
	AggregateID   string      `json:"aggregateId" gorm:"index:idx_events_aggregate"`
	AggregateType string      `json:"aggregateType" gorm:"index:idx_events_aggregate"`
	Version       int64       `json:"version"`
	GlobalVersion int64       `json:"globalVersion"`
	State         EventState  `json:"state,omitempty" gorm:"-"`
	Data          interface{} `json:"data" gorm:"-"`
	TenantID      string      `json:"tenantID"`
	UserID        string      `json:"userID"`
	Topic         string      `json:"topic"`
}

Event represents a single event in the system.

func NewEvent

func NewEvent(data any, state EventState) Event

NewEvent creates a new Event with the provided data and aggregate information.

func NewSnapshot

func NewSnapshot(aggregate Aggregate) Event

NewSnapshot creates a snapshot event by wrapping the aggregate as the Data field.

func (*Event) GetState

func (e *Event) GetState() EventState

GetState returns the current state of the event.

func (*Event) Hydrate

func (e *Event) Hydrate(engine *Engine, data, metadata any) error

func (*Event) InState

func (e *Event) InState(s EventState) bool

HasState checks if the event has a specific state.

func (*Event) SetID

func (e *Event) SetID(id uuid.UUID)

SetID assigns a UUID to the event.

func (*Event) SetState

func (e *Event) SetState(s EventState)

SetState adds a state to the event if it doesn't exist.

type EventCodec

type EventCodec struct {
	UnmarshalFn func(any) (any, error)
	MarshalFn   func(any) ([]byte, error)
}

EventCodec holds closures for marshalling/unmarshalling a particular event type.

type EventFilter

type EventFilter struct {
	AggregateType  string   // Filter by aggregate type (e.g., "Order", "User")
	AggregateID    string   // Filter by a specific aggregate ID
	AggregateTypes []string // Filter by multiple aggregate types
	EventType      []string // Filter by event type (e.g., "OrderCreated", "UserUpdated")
	Topics         []string // Filter by topics (e.g., "order", "user")

	FromVersion int // Minimum event version to load
	ToVersion   int // Maximum event version to load

	FromGlobalVersion int // Minimum global version to load
	ToGlobalVersion   int // Maximum global version to load

	FromTime time.Time // Events created after this time

	ToTime time.Time // Events created before this time
	Limit  int       // Maximum number of events to return

}

EventFilter represents criteria for filtering events during retrieval.

type EventKind

type EventKind string
const (
	EventKindSnapshot EventKind = "snapshot"
	EventKindEvent    EventKind = "event"
)

type EventProjection

type EventProjection interface {
	EventTypes() []any
}

type EventState

type EventState string

EventState represents the state of an event using bitwise flags.

const (
	EventStateReady     EventState = "ready"
	EventStateApplied   EventState = "applied"
	EventStateFailed    EventState = "failed"
	EventStateCompleted EventState = "completed"
	EventStateRetry     EventState = "retry"
	EventStateCanceled  EventState = "canceled"
)

type EventStore

type EventStore interface {
	// Save persists one or more events to the event store.
	// Events are stored atomically to ensure consistency.
	Save(ctx context.Context, events ...*Event) error

	// LoadEvents retrieves all events across different aggregates.
	// This can be useful for projections or rebuilding read models. (This is expected to return in order of GlobalVersion)
	LoadEvents(ctx context.Context, filters ...EventFilter) ([]PersistedEvent, error)

	// LoadEventsInBatches loads the events in batches of batch size calling handler for each batch
	// if handler returns error processing is stopped (These is expected to return in order of GlobalVersion)
	LoadEventsInBatches(ctx context.Context, batchSize int, handler func([]PersistedEvent) error, filters ...EventFilter) error

	// IsNewEvent checks if an event is new by inspecting its metadata (e.g., ID, version).
	IsNewEvent(event Event) bool

	// Exists checks if an event exists by its ID or unique key.
	Exists(ctx context.Context, eventID uuid.UUID) (bool, error)

	// DeleteEvent removes an event by ID. Useful for GDPR or soft deletes.
	DeleteEvent(ctx context.Context, eventID uuid.UUID) error

	// AggregateSnapshot persists a snapshot of an aggregate state.
	SaveSnapshot(ctx context.Context, snapshot Aggregate) error

	// LoadSnapshot retrieves the latest snapshot of an aggregate for faster loading.
	LoadSnapshot(ctx context.Context, aggregateType, aggregateID string) (PersistedEvent, error)

	IncrementGlobalVersion(ctx context.Context) (int64, error)
}

EventStore is an interface for managing event persistence and retrieval.

type Events

type Events []Event

func (Events) ByState

func (evts Events) ByState(state EventState) Events

func (Events) Completed

func (evts Events) Completed() Events

func (Events) MarkComplete

func (evts Events) MarkComplete() Events

func (Events) MarkFailed

func (evts Events) MarkFailed() Events

func (Events) Ptr

func (evts Events) Ptr() []*Event

func (Events) Ready

func (evts Events) Ready() Events

func (Events) Uncommitted

func (evts Events) Uncommitted() Events

type EventsourcePlugin

type EventsourcePlugin struct {
	// contains filtered or unexported fields
}

Plugin implements the Plugin interface for the eventsource

func NewPlugin

func NewPlugin(opts ...PluginOption) *EventsourcePlugin

NewPlugin creates a new Plugin with the given store

func Plugin

func Plugin() *EventsourcePlugin

Plugin returns the eventsource plugin from the application

func (*EventsourcePlugin) Commands

func (p *EventsourcePlugin) Commands() []*cobra.Command

Commands returns the list of CLI commands provided by the plugin

func (*EventsourcePlugin) Configure

func (p *EventsourcePlugin) Configure(configure func(*EventsourcePlugin)) *EventsourcePlugin

ConfigureEngine allows additional configuration or usage of the engine

func (*EventsourcePlugin) Deinitialize

func (p *EventsourcePlugin) Deinitialize(app *golly.Application) error

Deinitialize stops the engine

func (*EventsourcePlugin) Engine

func (p *EventsourcePlugin) Engine() *Engine

Engine returns the initialized engine

func (*EventsourcePlugin) Initialize

func (p *EventsourcePlugin) Initialize(app *golly.Application) error

Initialize sets up the engine and starts it

func (*EventsourcePlugin) Name

func (p *EventsourcePlugin) Name() string

Name returns the name of the plugin

type HandlerCache

type HandlerCache struct {
	// contains filtered or unexported fields
}

func NewHandlerCache

func NewHandlerCache() *HandlerCache

func (*HandlerCache) Reset

func (hc *HandlerCache) Reset()

type IDdProjection

type IDdProjection interface {
	ID() string
}

type InMemoryEvent

type InMemoryEvent struct {
	Event
}

InMemoryEvent wraps an Event and implements the PersistedEvent interface. It can be as simple as returning the embedded Event on Hydrate(...).

func (InMemoryEvent) GlobalVersion

func (t InMemoryEvent) GlobalVersion() int64

func (InMemoryEvent) Hydrate

func (t InMemoryEvent) Hydrate(engine *Engine) (Event, error)

Hydrate satisfies PersistedEvent. In this test store scenario, we simply return the embedded Event without transformation. In a real store, you'd parse raw data or do reflection if needed.

type InMemoryStore

type InMemoryStore struct {
	SaveFail error
	// contains filtered or unexported fields
}

InMemoryStore is an in-memory implementation of EventStore for testing purposes.

func NewInMemoryStore

func NewInMemoryStore(data ...Event) *InMemoryStore

func (*InMemoryStore) DeleteEvent

func (s *InMemoryStore) DeleteEvent(ctx context.Context, eventID uuid.UUID) error

DeleteEvent removes an event by ID.

func (*InMemoryStore) Exists

func (s *InMemoryStore) Exists(ctx context.Context, eventID uuid.UUID) (bool, error)

Exists checks if an event exists by its ID.

func (*InMemoryStore) GlobalVersion

func (s *InMemoryStore) GlobalVersion(ctx context.Context) (int64, error)

GlobalVersion returns the highest global version in this test store.

func (*InMemoryStore) IncrementGlobalVersion

func (s *InMemoryStore) IncrementGlobalVersion(ctx context.Context) (int64, error)

func (*InMemoryStore) IsNewEvent

func (s *InMemoryStore) IsNewEvent(event Event) bool

IsNewEvent checks if an event is new by aggregate ID and version.

func (*InMemoryStore) LoadEvents

func (s *InMemoryStore) LoadEvents(ctx context.Context, filters ...EventFilter) ([]PersistedEvent, error)

LoadEvents retrieves all events (optionally filtered) as []PersistedEvent.

func (*InMemoryStore) LoadEventsInBatches

func (s *InMemoryStore) LoadEventsInBatches(
	ctx context.Context,
	batchSize int,
	handler func([]PersistedEvent) error,
	filters ...EventFilter,
) error

LoadEventsInBatches loads events in ascending global version order in batches.

func (*InMemoryStore) LoadSnapshot

func (s *InMemoryStore) LoadSnapshot(ctx context.Context, aggregateType, aggregateID string) (PersistedEvent, error)

LoadSnapshot retrieves the latest snapshot for an aggregate as a PersistedEvent.

func (*InMemoryStore) Save

func (s *InMemoryStore) Save(ctx context.Context, events ...*Event) error

Save persists one or more events atomically in memory.

func (*InMemoryStore) SaveSnapshot

func (s *InMemoryStore) SaveSnapshot(ctx context.Context, agg Aggregate) error

SaveSnapshot persists a snapshot of an aggregate in memory.

type InternalStream

type InternalStream struct {
	// contains filtered or unexported fields
}

InternalStream is a simple stream for projections and in-memory subscriptions No complex ordering - just process events as they arrive

func NewInternalStream

func NewInternalStream(name string) *InternalStream

NewInternalStream creates a simple internal stream for projections

func (*InternalStream) Name

func (s *InternalStream) Name() string

Name returns the stream name

func (*InternalStream) Publish

func (s *InternalStream) Publish(ctx context.Context, topic string, evt any) error

Publish enqueues an event for immediate processing

func (*InternalStream) Start

func (s *InternalStream) Start()

Start begins processing events

func (*InternalStream) Stop

func (s *InternalStream) Stop()

Stop stops processing events

func (*InternalStream) Subscribe

func (s *InternalStream) Subscribe(topic string, handler StreamHandler)

Subscribe registers a handler for a topic

func (*InternalStream) Unsubscribe

func (s *InternalStream) Unsubscribe(topic string, handler StreamHandler)

Unsubscribe removes a handler for a topic

type Job

type Job struct {
	Ctx   context.Context
	Event Event
}

type NewRecordChecker

type NewRecordChecker interface {
	IsNewRecord() bool
}

type Option

type Option func(*Options)

Option is a function that configures a projection registration or other engine-related setup.

func WithStore

func WithStore(store EventStore) Option

WithStore configures the Engine to use the provided EventStore

func WithStreamBlockedTimeout

func WithStreamBlockedTimeout(timeout time.Duration) Option

deprecated: bus defined options

func WithStreamBufferSize

func WithStreamBufferSize(size int) Option

deprecated: bus defined options

func WithStreamName

func WithStreamName(name string) Option

deprecated: bus defined options

func WithStreamPartitions

func WithStreamPartitions(n uint32) Option

deprecated: bus defined options

func WithStreams

func WithStreams(streams ...StreamPublisher) Option

type Options

type Options struct {
	Store   EventStore
	Stream  *StreamOptions
	Streams []StreamPublisher
}

Options holds all possible configuration parameters that can be adjusted via Option functions.

type PersistedEvent

type PersistedEvent interface {
	Hydrate(*Engine) (Event, error)
}

type PluginOption

type PluginOption func(*PluginOptions)

func PluginWithEngine

func PluginWithEngine(engine *Engine) PluginOption

func PluginWithStore

func PluginWithStore(store EventStore) PluginOption

func PluginWithStreams

func PluginWithStreams(streams ...StreamPublisher) PluginOption

func PluginWithUserInfoFunc

func PluginWithUserInfoFunc(fnc func(context.Context) UserInfo) PluginOption

type PluginOptions

type PluginOptions struct {
	// contains filtered or unexported fields
}

type Projection

type Projection interface {
	HandleEvent(context.Context, Event) error

	// Position returns the last known position in the global event stream.
	Position(context.Context) int64
	SetPosition(context.Context, int64) error

	// Reset to clear state for rebuild
	Reset(context.Context) error
}

type ProjectionBase

type ProjectionBase struct {
	// contains filtered or unexported fields
}

ProjectionBase is an embeddable helper for common Projection logic.

func (*ProjectionBase) Position

func (p *ProjectionBase) Position(context.Context) int64

Position/SetPosition with atomic

func (*ProjectionBase) Reset

func (p *ProjectionBase) Reset(ctx context.Context) error

Reset sets position to -1

func (*ProjectionBase) SetPosition

func (p *ProjectionBase) SetPosition(ctx context.Context, pos int64) error

type ProjectionHandler

type ProjectionHandler func(context.Context, Event) error

type ProjectionManager

type ProjectionManager struct {
	// contains filtered or unexported fields
}

ProjectionManager manages multiple projections, each identified by a key.

func NewProjectionManager

func NewProjectionManager() *ProjectionManager

NewProjectionManager creates a ProjectionManager with an empty registry.

func (*ProjectionManager) Get

func (pm *ProjectionManager) Get(projID string) (Projection, error)

Get returns a projection by ID. Returns an error if the projection is not found. Returns nil if the projection is found. Returns nil if the projection is found.

func (*ProjectionManager) List

List returns an iterator over all registered projections with their IDs. Snapshot is taken when List() is called. Iteration order is deterministic (sorted by ID). Compatible with Go 1.23+ range-over-func using iter.Seq2.

for id, proj := range manager.List() {
    fmt.Printf("Projection %s: %v\n", id, proj)
}

func (*ProjectionManager) Rebuild

func (pm *ProjectionManager) Rebuild(ctx context.Context, eng *Engine, projID string) error

Rebuild resets a single projection, then processes all events from version 0 upward.

func (*ProjectionManager) Register

func (pm *ProjectionManager) Register(projs ...Projection)

Register adds one or more projections to the manager and indexes them by their filters.

func (*ProjectionManager) RunToEnd

func (pm *ProjectionManager) RunToEnd(ctx context.Context, eng *Engine, projID string) error

RunToEnd catches up a single projection from its current position to the end of the stream.

func (*ProjectionManager) Start

func (pm *ProjectionManager) Start()

Start begins async processing of projection events

func (*ProjectionManager) Stop

func (pm *ProjectionManager) Stop()

Stop gracefully shuts down projection processing, draining all in-flight events

type Publishable

type Publishable interface {
	Topic() string
}

type Stream

type Stream struct {
	*InternalStream
}

Stream represents a single in-memory event stream with subscriptions. This is now just a wrapper around InternalStream for backward compatibility.

func NewStream

func NewStream(opts StreamOptions) *Stream

NewStream initializes a new Stream with options

type StreamHandler

type StreamHandler func(context.Context, Event)

type StreamLifecycle

type StreamLifecycle interface {
	Start()
	Stop()
}

StreamLifecycle allows external lifecycle control; optional.

type StreamManager

type StreamManager struct {
	// contains filtered or unexported fields
}

StreamManager manages streams and coordinates dispatch.

func NewStreamManager

func NewStreamManager() *StreamManager

NewStreamManager initializes a new StreamManager.

func (*StreamManager) Add

func (sm *StreamManager) Add(streams ...StreamPublisher)

Add registers streams for publish fanout.

func (*StreamManager) Publish

func (sm *StreamManager) Publish(ctx context.Context, topic string, events ...Event)

Publish publishes events to all streams.

func (*StreamManager) Start

func (sm *StreamManager) Start()

Start starts streams that implement lifecycle.

func (*StreamManager) Stop

func (sm *StreamManager) Stop()

Stop stops streams that implement lifecycle.

func (*StreamManager) Subscribe

func (sm *StreamManager) Subscribe(topic string, handler StreamHandler) bool

Subscribe subscribes the handler to the first subscribable stream.

type StreamNamed

type StreamNamed interface{ Name() string }

StreamNamed provides a debug/metrics-friendly name for a stream.

type StreamOptions

type StreamOptions struct {
	Name           string
	NumPartitions  uint32
	BufferSize     int
	BlockedTimeout time.Duration
}

type StreamPublisher

type StreamPublisher interface {
	Publish(ctx context.Context, topic string, event any) error
}

StreamPublisher can publish an event to a topic on this stream (in-memory or outbound).

type StreamSubscriber

type StreamSubscriber interface {
	Subscribe(topic string, handler StreamHandler)
}

StreamSubscriber can subscribe a handler to receive events for a topic ("*" for all).

type TestEngineOptions

type TestEngineOptions struct {
	Aggregates   []AggregateDefinition
	Projections  []Projection
	Data         []any
	Streams      []StreamPublisher
	UserInfoFunc func(context.Context) UserInfo
}

type TestEventData

type TestEventData struct {
	AggregateID   string
	AggregateType string
	Data          any
	// Optional metadata
	Metadata map[string]any

	// if not set, will be set to the version
	Version int64
	// if not set, will be set to the version
	GlobalVersion int64
}

type TopicsProjection

type TopicsProjection interface {
	Topics() []string
}

type UserInfo

type UserInfo struct {
	UserID   string
	TenantID string
	Metadata map[string]any
}

Directories

Path Synopsis
eventstores
gorm module
gormstore module
examples
aggregates command
basic command
full command
middleware command
projections command
repositories
mongo module
vector module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL