Documentation
¶
Index ¶
- func HydrateState[T Handler](t *testing.T, state T, contents ...Content) T
- func VerifyEvents(t *testing.T, events []Content, expected ...any) bool
- type Config
- type Content
- type Event
- type EventBus
- type EventUpgrade
- type EventUpgradeFunc
- type Handler
- type HandlerFunc
- type InMemoryEventBus
- func (bus *InMemoryEventBus) Close() error
- func (bus *InMemoryEventBus) GetSubscriberIDs(ctx context.Context, streamType string) ([]string, error)
- func (bus *InMemoryEventBus) Subscribe(ctx context.Context, streamType, subscriberID string, handler Handler) error
- func (bus *InMemoryEventBus) Write(ctx context.Context, streamType string, events iter.Seq2[Event, error]) error
- func (bus *InMemoryEventBus) WriteTo(ctx context.Context, streamType string, events iter.Seq2[Event, error], ...) error
- type Logger
- type Option
- func WithDefaultSlog() Option
- func WithEventBus(bus EventBus) Option
- func WithEventUpgrades(streamType string, upgrades ...EventUpgrade) Option
- func WithEvents(streamType string, contentTypes []Content) Option
- func WithLogger(logger Logger) Option
- func WithNoopLogger() Option
- func WithSlog(log *slog.Logger) Option
- type ReadWriter
- type Reader
- type Storage
- type Store
- func (s *Store) Close() error
- func (s *Store) GetSubscriberIDs(ctx context.Context, streamType string) ([]string, error)
- func (s *Store) Open(ctx context.Context, streamType string, streamID string) Stream
- func (s *Store) OpenFrom(ctx context.Context, streamType string, streamID string, eventNumber int64) Stream
- func (s *Store) Project(ctx context.Context, streamType, streamID string, handler Handler) (err error)
- func (s *Store) Start(ctx context.Context) error
- func (s *Store) Subscribe(ctx context.Context, streamType string, subscriberID string, handler Handler) error
- type Stream
- type StreamReference
- type Writer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func HydrateState ¶
HydrateState is a test helper meant to make it easy to hydrate a state using event data.
Types ¶
type Content ¶
type Content interface {
EventName() string
}
Content is the application specific data model used in an Event.
type Event ¶
type Event struct {
// StreamID is the ID of the stream the event belongs to.
StreamID string
// StreamType is the type of the stream the event belongs to.
StreamType string
// EventNumber is the number of the event in the stream.
EventNumber int64
// EventTime is the time the event was recorded in the Store
EventTime time.Time
// Content is the actual content of the event. Expected to be a struct defined
// by the application.
Content Content
// StoreEventID is the ID of the event assigned by the Store
// The StoreEventID is a UUIDv7 with the underlying time matching the EventTime
StoreEventID string
// StoreStreamID is the ID of the stream assigned by the Store
// The StoreStreamID is a UUIDv7 with the underlying time matching the EventTime
// of the first event in the stream.
StoreStreamID string
}
Event is a combination of the metadata and content of a business event in the system. It is part of a Stream that makes up the current state of a business entity.
type EventBus ¶
type EventBus interface {
// Write should write the events to all subscriptions
Write(ctx context.Context, streamType string, events iter.Seq2[Event, error]) error
// Subscribe a Handler by it's subscriptionID
Subscribe(ctx context.Context, streamType string, subscriberID string, handler Handler) error
// GetSubscriberIDs returns a list of all subscription IDs for the streamType
GetSubscriberIDs(ctx context.Context, streamType string) ([]string, error)
// WriteTo call all Handler with subscriberIDs with the events
WriteTo(ctx context.Context, streamType string, events iter.Seq2[Event, error], subscriberIDs ...string) error
Close() error
}
EventBus is responsible for distributing an Event to all subscribing Handler's after they are written to the Storage.
type EventUpgrade ¶
type EventUpgrade interface {
Upgrade(ctx context.Context, events iter.Seq2[Event, error]) iter.Seq2[Event, error]
}
EventUpgrade events into a new version. The events will all be from the same Stream and come in order by EventNumber.
The EventUpgrade is used when reading a Stream and when an Event is published to subscribing Handlers. As a result, it is only the events that are in-flight that will go through the EventUpgrade. Example: If you Open a Stream from EventNumber 3, Event 1 and 2 will not be in the events sequence.
type EventUpgradeFunc ¶
type HandlerFunc ¶
HandlerFunc is a convenience type to allow an inline func to act as a Handler
type InMemoryEventBus ¶
type InMemoryEventBus struct {
// contains filtered or unexported fields
}
func NewInMemoryEventBus ¶
func NewInMemoryEventBus() *InMemoryEventBus
func (*InMemoryEventBus) Close ¶
func (bus *InMemoryEventBus) Close() error
func (*InMemoryEventBus) GetSubscriberIDs ¶
type Option ¶
type Option func(*Config)
func WithDefaultSlog ¶
func WithDefaultSlog() Option
func WithEventBus ¶
func WithEventUpgrades ¶
func WithEventUpgrades(streamType string, upgrades ...EventUpgrade) Option
func WithEvents ¶
func WithLogger ¶
func WithNoopLogger ¶
func WithNoopLogger() Option
type ReadWriter ¶
type Reader ¶
type Reader interface {
Read(ctx context.Context, streamType string, streamID string, eventNumber int64) iter.Seq2[Event, error]
}
Reader allows getting a sequence of Events for an StreamType and StreamID
type Storage ¶
type Storage interface {
// Read the events of of an streamType with the streamID from eventNumber
Read(ctx context.Context, streamType string, streamID string, eventNumber int64) iter.Seq2[Event, error]
// Write writes the events to the store.
// All of the events must be written by sequence.
// They should all be written or fully fail.
Write(ctx context.Context, streamType string, events iter.Seq2[Event, error]) error
// StartPublish should begin the process where newly written events are published to the Writer.
// The publishing must be cancelled with the context
StartPublish(ctx context.Context, w Writer) error
// Register allows the Storage to Unmarshal multiple shapes of Content for an streamType.
// It is considered an error if a Storage contains a shape of Content that have not been registered.
Register(streamType string, types ...Content) error
// GetStreamReferences returns a list of references to streams for the given streamType.
// The returned list is ordered by the StoreStreamID and limited in size by the limit.
// Use the last StreamReference.StoreStreamID as the pagination token.
GetStreamReferences(ctx context.Context, streamType string, storeStreamID string, limit int64) iter.Seq2[StreamReference, error]
}
Storage is the abstracts the persistence of a Store.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
func (*Store) GetSubscriberIDs ¶
GetSubscriberIDs returns a list of all subscriber IDs registered for a given streamType.
func (*Store) Open ¶
Open a stream by its type and id. The Stream will be opened at the start and must be closed.
func (*Store) OpenFrom ¶
func (s *Store) OpenFrom(ctx context.Context, streamType string, streamID string, eventNumber int64) Stream
OpenFrom opens a Stream so the first event read will be eventNumber + 1. The Stream must be closed.
func (*Store) Project ¶
func (s *Store) Project(ctx context.Context, streamType, streamID string, handler Handler) (err error)
Project onto a Handler all Events by the type and id of the stream.
type Stream ¶
type Stream interface {
// Project iterates over all events in the stream and calls the handler for each event.
// The Stream will stop projecting if the handler returns an error.
Project(handler Handler) error
// All returns a iter.Seq2 of all events in the stream.
// The returned iter.Seq2 will stop and return an error if there was an error
// reading the events.
// Calling this method twice will return the same iter.Seq2
All() iter.Seq2[Event, error]
// Write writes the given events to the stream.
// The Events will be written in the order they are given and starting
// at the most recent event number + 1.
Write(events ...Content) error
// Position returns the current position of the stream.
Position() int64
Close() error
}
Stream is a sequence of Events that in combination represent the state of a business entity. The Stream can be written and read from, which enables applications to alter and get the state.
type StreamReference ¶
StreamReference is used as a reference to a stream. It is primarily used a convenience for the Storage interface.