Documentation
¶
Index ¶
- func Clear[T any](bus *EventBus)
- func ClearAll(bus *EventBus)
- func EventType(event any) string
- func HandlerCount[T any](bus *EventBus) int
- func HasHandlers[T any](bus *EventBus) bool
- func Publish[T any](bus *EventBus, event T)
- func PublishContext[T any](bus *EventBus, ctx context.Context, event T)
- func RegisterUpcast[From any, To any](bus *EventBus, upcast func(From) To) error
- func RegisterUpcastFunc(bus *EventBus, fromType, toType string, upcast UpcastFunc) error
- func Subscribe[T any](bus *EventBus, handler Handler[T], opts ...SubscribeOption) error
- func SubscribeContext[T any](bus *EventBus, handler ContextHandler[T], opts ...SubscribeOption) error
- func SubscribeWithReplay[T any](ctx context.Context, bus *EventBus, subscriptionID string, handler Handler[T], ...) error
- func Unsubscribe[T any, H any](bus *EventBus, handler H) error
- type ContextHandler
- type Event
- type EventBus
- func (bus *EventBus) ClearUpcasts()
- func (bus *EventBus) ClearUpcastsForType(eventType string)
- func (bus *EventBus) GetStore() EventStore
- func (bus *EventBus) IsPersistent() bool
- func (bus *EventBus) Replay(ctx context.Context, from Offset, handler func(*StoredEvent) error) error
- func (bus *EventBus) ReplayWithUpcast(ctx context.Context, from Offset, handler func(*StoredEvent) error) error
- func (bus *EventBus) SetAfterPublishHook(hook PublishHook)
- func (bus *EventBus) SetBeforePublishHook(hook PublishHook)
- func (bus *EventBus) SetPanicHandler(handler PanicHandler)
- func (bus *EventBus) SetPersistenceErrorHandler(handler PersistenceErrorHandler)
- func (bus *EventBus) SetUpcastErrorHandler(handler UpcastErrorHandler)
- func (bus *EventBus) Shutdown(ctx context.Context) error
- func (bus *EventBus) Wait()
- type EventStore
- type EventStoreStreamer
- type EventStoreSubscriber
- type Handler
- type MemoryStore
- func (m *MemoryStore) Append(ctx context.Context, event *Event) (Offset, error)
- func (m *MemoryStore) LoadOffset(ctx context.Context, subscriptionID string) (Offset, error)
- func (m *MemoryStore) Read(ctx context.Context, from Offset, limit int) ([]*StoredEvent, Offset, error)
- func (m *MemoryStore) ReadStream(ctx context.Context, from Offset) iter.Seq2[*StoredEvent, error]
- func (m *MemoryStore) SaveOffset(ctx context.Context, subscriptionID string, offset Offset) error
- type Observability
- type Offset
- type Option
- func WithAfterPublish(hook PublishHook) Option
- func WithAfterPublishContext(hook PublishHookContext) Option
- func WithBeforePublish(hook PublishHook) Option
- func WithBeforePublishContext(hook PublishHookContext) Option
- func WithObservability(obs Observability) Option
- func WithPanicHandler(handler PanicHandler) Option
- func WithPersistenceErrorHandler(handler PersistenceErrorHandler) Option
- func WithPersistenceTimeout(timeout time.Duration) Option
- func WithReplayBatchSize(size int) Option
- func WithStore(store EventStore) Option
- func WithSubscriptionStore(store SubscriptionStore) Option
- func WithUpcast(fromType, toType string, upcast UpcastFunc) Option
- func WithUpcastErrorHandler(handler UpcastErrorHandler) Option
- type PanicHandler
- type PersistenceErrorHandler
- type PublishHook
- type PublishHookContext
- type StoredEvent
- type SubscribeOption
- type SubscriptionStore
- type TypeNamer
- type UpcastErrorHandler
- type UpcastFunc
- type Upcaster
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ClearAll ¶ added in v0.5.0
func ClearAll(bus *EventBus)
ClearAll removes all handlers for all event types
func EventType ¶ added in v0.6.0
EventType returns the type name of an event. If the event implements TypeNamer, it returns the custom name. Otherwise, it returns the reflection-based package-qualified name.
This is useful for comparing with StoredEvent.Type during replay.
Example with reflection (default):
eventType := EventType(MyEvent{})
// Returns: "github.com/mypackage/MyEvent"
Example with TypeNamer:
type MyEvent struct{}
func (e MyEvent) EventTypeName() string { return "my-event.v1" }
eventType := EventType(MyEvent{})
// Returns: "my-event.v1"
Usage in replay:
bus.Replay(ctx, 0, func(event *StoredEvent) error {
if event.Type == EventType(MyEvent{}) {
// Process MyEvent
}
return nil
})
func HandlerCount ¶ added in v0.5.0
HandlerCount returns the number of handlers for events of type T
func HasHandlers ¶ added in v0.5.0
HasHandlers checks if there are handlers for events of type T
func PublishContext ¶
PublishContext publishes an event with context to all registered handlers
func RegisterUpcast ¶ added in v0.8.0
RegisterUpcast registers a type-safe upcast function
func RegisterUpcastFunc ¶ added in v0.8.0
func RegisterUpcastFunc(bus *EventBus, fromType, toType string, upcast UpcastFunc) error
RegisterUpcastFunc registers a raw upcast function for complex transformations
func Subscribe ¶
func Subscribe[T any](bus *EventBus, handler Handler[T], opts ...SubscribeOption) error
Subscribe registers a handler for events of type T
func SubscribeContext ¶
func SubscribeContext[T any](bus *EventBus, handler ContextHandler[T], opts ...SubscribeOption) error
SubscribeContext registers a context-aware handler for events of type T
func SubscribeWithReplay ¶ added in v0.3.0
func SubscribeWithReplay[T any]( ctx context.Context, bus *EventBus, subscriptionID string, handler Handler[T], opts ...SubscribeOption, ) error
SubscribeWithReplay subscribes and replays missed events. Requires both an EventStore (for replay) and a SubscriptionStore (for tracking). If the store implements SubscriptionStore, it will be used automatically.
Context usage:
- The context is used for the replay phase (loading historical events)
- The context is used for saving subscription offsets
- The context is NOT used for the live subscription handler (which follows the bus's lifecycle)
Note: If the saved offset is OffsetOldest (""), replay starts from the beginning. The OffsetNewest ("$") constant is typically not stored and is only used for live subscriptions.
Types ¶
type ContextHandler ¶
ContextHandler is a generic event handler function that accepts context
type Event ¶ added in v0.8.4
type Event struct {
Type string `json:"type"`
Data json.RawMessage `json:"data"`
Timestamp time.Time `json:"timestamp"`
}
Event represents an event to be stored (before it has an offset).
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus is a high-performance event bus with sharded locks
func (*EventBus) ClearUpcasts ¶ added in v0.8.0
func (bus *EventBus) ClearUpcasts()
ClearUpcasts removes all registered upcasters
func (*EventBus) ClearUpcastsForType ¶ added in v0.8.0
ClearUpcastsForType removes all upcasters for a specific source type
func (*EventBus) GetStore ¶ added in v0.3.0
func (bus *EventBus) GetStore() EventStore
GetStore returns the event store (or nil if not persistent)
func (*EventBus) IsPersistent ¶ added in v0.3.0
IsPersistent returns true if persistence is enabled
func (*EventBus) Replay ¶ added in v0.3.0
func (bus *EventBus) Replay(ctx context.Context, from Offset, handler func(*StoredEvent) error) error
Replay replays events from an offset
func (*EventBus) ReplayWithUpcast ¶ added in v0.8.0
func (bus *EventBus) ReplayWithUpcast(ctx context.Context, from Offset, handler func(*StoredEvent) error) error
ReplayWithUpcast replays events from an offset, applying upcasts before passing to handler
func (*EventBus) SetAfterPublishHook ¶ added in v0.2.0
func (bus *EventBus) SetAfterPublishHook(hook PublishHook)
SetAfterPublishHook sets the after publish hook (for backward compatibility)
func (*EventBus) SetBeforePublishHook ¶ added in v0.2.0
func (bus *EventBus) SetBeforePublishHook(hook PublishHook)
SetBeforePublishHook sets the before publish hook (for backward compatibility)
func (*EventBus) SetPanicHandler ¶
func (bus *EventBus) SetPanicHandler(handler PanicHandler)
SetPanicHandler sets the panic handler (for backward compatibility)
func (*EventBus) SetPersistenceErrorHandler ¶ added in v0.8.0
func (bus *EventBus) SetPersistenceErrorHandler(handler PersistenceErrorHandler)
SetPersistenceErrorHandler sets the persistence error handler (for runtime configuration)
func (*EventBus) SetUpcastErrorHandler ¶ added in v0.8.0
func (bus *EventBus) SetUpcastErrorHandler(handler UpcastErrorHandler)
SetUpcastErrorHandler sets the upcast error handler at runtime
type EventStore ¶ added in v0.3.0
type EventStore interface {
// Append stores an event and returns its assigned offset.
// The store is responsible for generating unique, monotonically increasing offsets.
Append(ctx context.Context, event *Event) (Offset, error)
// Read returns events starting after the given offset.
// Use OffsetOldest to read from the beginning.
// The limit parameter controls max events returned (0 = no limit).
// Returns the events, the offset to use for the next read, and any error.
Read(ctx context.Context, from Offset, limit int) ([]*StoredEvent, Offset, error)
}
EventStore defines the core interface for persisting events. This is a minimal interface with just 2 methods for basic event storage. Additional capabilities are provided through optional interfaces.
type EventStoreStreamer ¶ added in v0.9.2
type EventStoreStreamer interface {
// ReadStream returns an iterator yielding events starting after the given offset.
// Use OffsetOldest to read from the beginning.
// The iterator checks ctx.Done() before each yield and returns ctx.Err() when cancelled.
// A yielded error terminates iteration.
ReadStream(ctx context.Context, from Offset) iter.Seq2[*StoredEvent, error]
}
EventStoreStreamer is an optional interface for memory-efficient streaming. When implemented, the Replay method will automatically use streaming.
Implementation notes:
- Database-backed stores should use cursor-based iteration to minimize memory
- In-memory stores may need to take a snapshot to avoid holding locks during iteration, trading memory for deadlock safety (see MemoryStore.ReadStream for an example)
type EventStoreSubscriber ¶ added in v0.10.0
type EventStoreSubscriber interface {
// Subscribe starts a live subscription from the given offset.
// Returns a channel that yields events as they arrive.
// Call the returned cancel function to stop the subscription and close the channel.
Subscribe(ctx context.Context, from Offset) (<-chan *StoredEvent, func(), error)
}
EventStoreSubscriber is an optional interface for stores that support live subscriptions. This enables real-time event streaming for remote stores.
type MemoryStore ¶ added in v0.3.0
type MemoryStore struct {
// contains filtered or unexported fields
}
MemoryStore is a simple in-memory implementation of EventStore and SubscriptionStore.
func NewMemoryStore ¶ added in v0.3.0
func NewMemoryStore() *MemoryStore
NewMemoryStore creates a new in-memory event store
func (*MemoryStore) LoadOffset ¶ added in v0.10.0
LoadOffset implements SubscriptionStore
func (*MemoryStore) Read ¶ added in v0.10.0
func (m *MemoryStore) Read(ctx context.Context, from Offset, limit int) ([]*StoredEvent, Offset, error)
Read implements EventStore
func (*MemoryStore) ReadStream ¶ added in v0.10.0
func (m *MemoryStore) ReadStream(ctx context.Context, from Offset) iter.Seq2[*StoredEvent, error]
ReadStream implements EventStoreStreamer for memory-efficient event iteration. Note: This takes a filtered snapshot of matching events to avoid holding the lock during iteration, which could cause deadlocks if handlers call other store methods.
func (*MemoryStore) SaveOffset ¶ added in v0.10.0
SaveOffset implements SubscriptionStore
type Observability ¶ added in v0.8.3
type Observability interface {
// OnPublishStart is called when an event is about to be published.
// Returns a context that will be passed to handlers and subsequent hooks.
// The event parameter allows implementations to extract custom attributes.
OnPublishStart(ctx context.Context, eventType string, event any) context.Context
// OnPublishComplete is called after all synchronous handlers complete.
// Note: This is called before async handlers complete.
OnPublishComplete(ctx context.Context, eventType string)
// OnHandlerStart is called before a handler executes.
// Returns a context for the handler execution.
OnHandlerStart(ctx context.Context, eventType string, async bool) context.Context
// OnHandlerComplete is called after a handler completes.
// The error parameter is non-nil if the handler panicked.
OnHandlerComplete(ctx context.Context, duration time.Duration, err error)
// OnPersistStart is called before persisting an event.
OnPersistStart(ctx context.Context, eventType string, position int64) context.Context
// OnPersistComplete is called after persisting an event.
OnPersistComplete(ctx context.Context, duration time.Duration, err error)
}
Observability is an optional interface for metrics and tracing. Implementations can track event publishing, handler execution, and errors.
This interface is designed to be zero-cost when not used - if no observability is configured, there is no performance overhead.
The context returned from each method can be used to propagate trace spans and other context-specific data through the event processing pipeline.
Example implementation: see github.com/jilio/ebu/otel package for OpenTelemetry integration.
type Offset ¶ added in v0.10.0
type Offset string
Offset represents an opaque position in an event stream. Implementations define the format (e.g., "123", "abc_456", timestamp-based). Offsets are lexicographically comparable within the same store.
type Option ¶ added in v0.5.0
type Option func(*EventBus)
Option is a function that configures the EventBus
func WithAfterPublish ¶ added in v0.5.0
func WithAfterPublish(hook PublishHook) Option
WithAfterPublish sets a hook that's called after publishing events
func WithAfterPublishContext ¶ added in v0.9.1
func WithAfterPublishContext(hook PublishHookContext) Option
WithAfterPublishContext sets a context-aware hook that's called after publishing events
func WithBeforePublish ¶ added in v0.5.0
func WithBeforePublish(hook PublishHook) Option
WithBeforePublish sets a hook that's called before publishing events
func WithBeforePublishContext ¶ added in v0.9.1
func WithBeforePublishContext(hook PublishHookContext) Option
WithBeforePublishContext sets a context-aware hook that's called before publishing events
func WithObservability ¶ added in v0.8.3
func WithObservability(obs Observability) Option
WithObservability sets the observability implementation for metrics and tracing
func WithPanicHandler ¶ added in v0.5.0
func WithPanicHandler(handler PanicHandler) Option
WithPanicHandler sets a panic handler for the event bus
func WithPersistenceErrorHandler ¶ added in v0.8.0
func WithPersistenceErrorHandler(handler PersistenceErrorHandler) Option
WithPersistenceErrorHandler sets the error handler for persistence failures
func WithPersistenceTimeout ¶ added in v0.8.0
WithPersistenceTimeout sets the timeout for persistence operations
func WithReplayBatchSize ¶ added in v0.10.0
WithReplayBatchSize sets the batch size for Replay operations. This controls how many events are read at a time when using a store that doesn't implement EventStoreStreamer. Default is 100 if not set or set to 0.
func WithStore ¶ added in v0.3.0
func WithStore(store EventStore) Option
WithStore enables persistence with the given store
func WithSubscriptionStore ¶ added in v0.10.0
func WithSubscriptionStore(store SubscriptionStore) Option
WithSubscriptionStore enables subscription position tracking
func WithUpcast ¶ added in v0.8.0
func WithUpcast(fromType, toType string, upcast UpcastFunc) Option
WithUpcast adds an upcast function during bus creation
func WithUpcastErrorHandler ¶ added in v0.8.0
func WithUpcastErrorHandler(handler UpcastErrorHandler) Option
WithUpcastErrorHandler sets the error handler for upcast failures
type PanicHandler ¶
PanicHandler is called when a handler panics
type PersistenceErrorHandler ¶ added in v0.8.0
PersistenceErrorHandler is called when event persistence fails
type PublishHook ¶ added in v0.2.0
PublishHook is called when an event is published
type PublishHookContext ¶ added in v0.9.1
PublishHookContext is called when an event is published (with context)
type StoredEvent ¶ added in v0.3.0
type StoredEvent struct {
Offset Offset `json:"offset"`
Type string `json:"type"`
Data json.RawMessage `json:"data"`
Timestamp time.Time `json:"timestamp"`
}
StoredEvent represents an event that has been persisted with an offset.
type SubscribeOption ¶
type SubscribeOption func(*internalHandler)
SubscribeOption configures a subscription
func Sequential ¶ added in v0.5.0
func Sequential() SubscribeOption
Sequential ensures the handler executes sequentially (with mutex)
func WithFilter ¶ added in v0.7.0
func WithFilter[T any](predicate func(T) bool) SubscribeOption
WithFilter configures the handler to only receive events that match the predicate
type SubscriptionStore ¶ added in v0.10.0
type SubscriptionStore interface {
// SaveOffset persists the current offset for a subscription.
SaveOffset(ctx context.Context, subscriptionID string, offset Offset) error
// LoadOffset retrieves the last saved offset for a subscription.
// Returns OffsetOldest if the subscription has no saved offset.
LoadOffset(ctx context.Context, subscriptionID string) (Offset, error)
}
SubscriptionStore tracks subscription progress separately from event storage. This interface is optional and enables resumable subscriptions.
type TypeNamer ¶ added in v0.8.2
type TypeNamer interface {
EventTypeName() string
}
TypeNamer is an optional interface that events can implement to provide their own type name. This gives explicit control over event type naming, which is useful for:
- Stable type names across package refactoring
- Custom versioning schemes (e.g., "UserCreated.v2")
- Compatibility with external event stores
If an event implements TypeNamer, EventType() will use the provided name instead of the reflection-based package-qualified name.
Example:
type UserCreatedEvent struct {
UserID string
}
func (e UserCreatedEvent) EventTypeName() string {
return "user.created.v1"
}
type UpcastErrorHandler ¶ added in v0.8.0
type UpcastErrorHandler func(eventType string, data json.RawMessage, err error)
UpcastErrorHandler is called when an upcast operation fails
type UpcastFunc ¶ added in v0.8.0
type UpcastFunc func(data json.RawMessage) (json.RawMessage, string, error)
UpcastFunc transforms event data from one version to another. It receives the raw JSON data and returns transformed data with the new type name.
type Upcaster ¶ added in v0.8.0
type Upcaster struct {
FromType string // Source event type
ToType string // Target event type
Upcast UpcastFunc // Transformation function
}
Upcaster represents a transformation from one event type to another
Directories
¶
| Path | Synopsis |
|---|---|
|
otel
module
|
|
|
Package state implements the Durable Streams State Protocol for ebu.
|
Package state implements the Durable Streams State Protocol for ebu. |
|
stores
|
|
|
durablestream
module
|
|
|
sqlite
module
|