eventbus

package module
v0.10.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2025 License: MIT Imports: 9 Imported by: 3

README

ebu (Event BUs)

GoDoc Test and Coverage Go Coverage Go Report Card

A lightweight, type-safe event bus for Go with generics support. Build decoupled applications with compile-time type safety.

ebu stands for Event BUs - a simple, powerful event bus implementation for Go.

Features

  • 🔒 Type-safe - Full compile-time type safety with generics
  • Fast - Zero allocations in hot paths, optimized for performance
  • 🔄 Async support - Built-in async handlers with optional sequential processing
  • 🎯 Simple API - Clean, intuitive API with options pattern
  • 🧵 Thread-safe - Safe for concurrent use across goroutines
  • 🌐 Context support - First-class context support for cancellation and tracing
  • 🛡️ Panic recovery - Handlers are isolated from each other's panics
  • 🚀 Zero dependencies - Pure Go standard library (core package)
  • 💾 Event persistence - Built-in support for event storage and replay
  • 🌍 Remote storage - Native support for remote backends like durable-streams
  • 🔄 Event upcasting - Seamless event schema migration and versioning
  • 100% test coverage - Thoroughly tested for reliability

Installation

go get github.com/jilio/ebu

Quick Start

package main

import (
    "fmt"
    "time"

    eventbus "github.com/jilio/ebu"
)

// Define your event types
type UserLoginEvent struct {
    UserID    string
    Timestamp time.Time
}

type OrderCreatedEvent struct {
    OrderID string
    Amount  float64
}

func main() {
    // Create a new event bus
    bus := eventbus.New()

    // Subscribe to events with type-safe handlers
    eventbus.Subscribe(bus, func(event UserLoginEvent) {
        fmt.Printf("User %s logged in at %v\n", event.UserID, event.Timestamp)
    })

    eventbus.Subscribe(bus, func(event OrderCreatedEvent) {
        fmt.Printf("Order %s created for $%.2f\n", event.OrderID, event.Amount)
    })

    // Publish events - compile-time type safety!
    eventbus.Publish(bus, UserLoginEvent{
        UserID:    "user123",
        Timestamp: time.Now(),
    })

    eventbus.Publish(bus, OrderCreatedEvent{
        OrderID: "order456",
        Amount:  99.99,
    })
}

Core API

Subscribe and Publish
// Simple subscription
eventbus.Subscribe(bus, func(event UserEvent) {
    // Handle event
})

// With options
eventbus.Subscribe(bus, func(event EmailEvent) {
    sendEmail(event)
}, eventbus.Async(), eventbus.Once())

// Publish events
eventbus.Publish(bus, UserEvent{UserID: "123"})
Async Processing
// Parallel async processing (default)
eventbus.Subscribe(bus, func(event EmailEvent) {
    sendEmail(event) // Each email sent in parallel
}, eventbus.Async())

// Sequential async processing (preserves order)
eventbus.Subscribe(bus, func(event PaymentEvent) {
    processPayment(event) // Processed one at a time
}, eventbus.Async(), eventbus.Sequential())

// Wait for all async handlers
bus.Wait()
Graceful Shutdown

Shutdown the event bus gracefully, waiting for async handlers to complete with timeout support:

// Shutdown with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

if err := bus.Shutdown(ctx); err != nil {
    log.Printf("Shutdown timed out: %v", err)
}

The Shutdown method:

  • Waits for all async handlers to complete
  • Respects context timeout and cancellation
  • Returns context.DeadlineExceeded if handlers don't finish in time
  • Returns nil on successful graceful shutdown
Context Support
// Context-aware handlers
eventbus.SubscribeContext(bus, func(ctx context.Context, event RequestEvent) {
    traceID := ctx.Value("traceID")
    // Handle with context
})

// Publish with context
ctx := context.WithTimeout(context.Background(), 5*time.Second)
eventbus.PublishContext(bus, ctx, RequestEvent{Path: "/api/users"})
Event Filtering
eventbus.Subscribe(bus, func(event PriceEvent) {
    fmt.Printf("Alert: Price changed %.2f%%\n", event.Change)
}, eventbus.WithFilter(func(event PriceEvent) bool {
    return math.Abs(event.Change) > 5.0 // Only large changes
}))
Handler Management
// Check for handlers
if eventbus.HasHandlers[UserEvent](bus) {
    eventbus.Publish(bus, UserEvent{})
}

// Unsubscribe a handler
handler := func(event UserEvent) { /* ... */ }
eventbus.Subscribe(bus, handler)
eventbus.Unsubscribe(bus, handler)

// Clear all handlers for a type
eventbus.Clear[UserEvent](bus)

// Clear all handlers
bus.ClearAll()

Advanced Features

Event Persistence

Store and replay events for event sourcing, audit logs, and resumable subscriptions:

// Create persistent bus with in-memory store
store := eventbus.NewMemoryStore()
bus := eventbus.New(eventbus.WithStore(store))

// Events are automatically persisted
eventbus.Publish(bus, UserCreatedEvent{UserID: "123"})

// Replay events from the beginning
bus.Replay(ctx, eventbus.OffsetOldest, func(event *eventbus.StoredEvent) error {
    // Process stored event
    return nil
})

// Subscribe with automatic offset tracking
eventbus.SubscribeWithReplay(bus, "email-sender",
    func(event EmailEvent) {
        sendEmail(event)
        // Offset saved automatically after success
    })

See Persistence Guide for custom stores and advanced patterns.

Remote Storage

Use remote storage backends for distributed event persistence. ebu supports Durable Streams - an HTTP protocol for reliable, resumable, real-time data streaming developed by Electric:

import (
    eventbus "github.com/jilio/ebu"
    "github.com/jilio/ebu/stores/durablestream"
)

// Connect to a durable-streams server
store, err := durablestream.New(
    "http://localhost:4437/v1/stream",  // server base URL
    "mystream",                          // stream name
    durablestream.WithTimeout(30*time.Second),
)
if err != nil {
    log.Fatal(err)
}

// Use with event bus - same API as local storage
bus := eventbus.New(eventbus.WithStore(store))

// Events are now persisted to the remote durable-streams server
eventbus.Publish(bus, OrderCreatedEvent{OrderID: "123", Amount: 99.99})

Available storage backends:

  • MemoryStore - Built-in in-memory store for development
  • SQLite - stores/sqlite - Persistent local storage
  • Durable-Streams - stores/durablestream - Remote HTTP-based storage (protocol spec)

See Persistence Guide for all storage options.

Observability

Add metrics and distributed tracing with OpenTelemetry:

import (
    eventbus "github.com/jilio/ebu"
    "github.com/jilio/ebu/otel"
)

// Create observability implementation
obs, err := otel.New(
    otel.WithTracerProvider(tracerProvider),
    otel.WithMeterProvider(meterProvider),
)

// Create bus with observability
bus := eventbus.New(eventbus.WithObservability(obs))

// Events, handlers, and persistence are automatically tracked
eventbus.Publish(bus, UserCreatedEvent{UserID: "123"})

The otel package provides:

  • Metrics: Event counts, handler duration, error rates, persistence metrics
  • Tracing: Distributed tracing with spans for publish, handlers, and persistence
  • Zero overhead: Optional - no performance impact if not used
  • Vendor-neutral: Built on OpenTelemetry standards

See examples/observability for a complete example.

Event Upcasting

Migrate event schemas seamlessly without breaking existing handlers:

// V1 event
type UserCreatedV1 struct {
    UserID string
    Name   string
}

// V2 event with split name
type UserCreatedV2 struct {
    UserID    string
    FirstName string
    LastName  string
}

// Register upcast transformation
eventbus.RegisterUpcast(bus, func(v1 UserCreatedV1) UserCreatedV2 {
    parts := strings.Split(v1.Name, " ")
    return UserCreatedV2{
        UserID:    v1.UserID,
        FirstName: parts[0],
        LastName:  strings.Join(parts[1:], " "),
    }
})

// Old events automatically transformed when replayed
eventbus.SubscribeWithReplay(bus, "processor", func(event UserCreatedV2) {
    // Receives V2 format even for old V1 events
})

Upcasting supports:

  • Automatic chain resolution (V1→V2→V3)
  • Circular dependency detection
  • Type-safe transformations
  • Error handling hooks
Panic Recovery

Handlers are isolated - one panic won't affect others:

bus.SetPanicHandler(func(event any, handlerType reflect.Type, panicValue any) {
    log.Printf("Handler panic: %v", panicValue)
})

eventbus.Subscribe(bus, func(e Event) { panic("error") })
eventbus.Subscribe(bus, func(e Event) { /* Still runs! */ })
Global Hooks

Intercept all events for logging, metrics, or tracing:

bus.SetBeforePublishHook(func(eventType reflect.Type, event any) {
    log.Printf("Publishing %s", eventType.Name())
})

bus.SetAfterPublishHook(func(eventType reflect.Type, event any) {
    metrics.Increment("events." + eventType.Name())
})
Custom Event Type Names

Control event type naming explicitly with the TypeNamer interface for stable names across refactoring:

type UserCreatedEvent struct {
    UserID string
}

// Implement TypeNamer for explicit type control
func (e UserCreatedEvent) EventTypeName() string {
    return "user.created.v1"
}

// Now EventType() returns "user.created.v1" instead of package-qualified name
eventbus.Publish(bus, UserCreatedEvent{UserID: "123"})

Benefits:

  • Stable event names across package reorganization
  • Version control for event schema evolution
  • External compatibility with other event systems

See TypeNamer examples for versioning and migration patterns.

Documentation

Storage Backends

Backend Package Description
MemoryStore github.com/jilio/ebu In-memory store for development/testing
SQLite github.com/jilio/ebu/stores/sqlite Local persistent storage with WAL mode
Durable-Streams github.com/jilio/ebu/stores/durablestream Remote HTTP-based storage

See Persistence Guide for detailed usage.

State Protocol

The optional state package implements the Durable Streams State Protocol for database-style state synchronization:

import (
    eventbus "github.com/jilio/ebu"
    "github.com/jilio/ebu/state"
)

// Define entity type
type User struct {
    Name  string `json:"name"`
    Email string `json:"email"`
}

// Create and publish state changes
bus := eventbus.New(eventbus.WithStore(eventbus.NewMemoryStore()))

insertMsg, _ := state.Insert("user:1", User{Name: "Alice", Email: "[email protected]"})
eventbus.Publish(bus, insertMsg)

updateMsg, _ := state.Update("user:1", User{Name: "Alice Smith"}, state.WithTxID("tx-123"))
eventbus.Publish(bus, updateMsg)

// Materialize state from events
mat := state.NewMaterializer()
users := state.NewTypedCollection[User](state.NewMemoryStore[User]())
state.RegisterCollection(mat, users)

mat.Replay(ctx, bus, eventbus.OffsetOldest)

// Access materialized state
user, ok := users.Get("user:1")  // User{Name: "Alice Smith", ...}

Features:

  • Type-safe helpers: Insert, Update, Delete with Go generics
  • Options pattern: WithTxID, WithTimestamp, WithEntityType
  • Materializer: Build typed state from event streams
  • Control messages: SnapshotStart, SnapshotEnd, Reset
  • JSON interoperability: Compatible with durable-streams ecosystem

Best Practices

  1. Define clear event types - Use descriptive structs with meaningful fields
  2. Keep events immutable - Don't modify events after publishing
  3. Handle errors gracefully - Prefer returning errors over panicking
  4. Use async for I/O - Keep synchronous handlers fast
  5. Leverage context - Use PublishContext for cancellable operations
  6. Set panic handlers - Monitor and log handler failures in production
  7. Test concurrency - The bus is thread-safe, but test your handlers

Performance

  • Type-based routing with zero reflection for direct handlers
  • Zero allocations in hot paths
  • Efficient sharding reduces lock contention
  • Async handlers run in separate goroutines

Contributing

Contributions are welcome! Submit a Pull Request or open an Issue for bugs, features, or improvements.

License

MIT License - see LICENSE file for details.

Support

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Clear

func Clear[T any](bus *EventBus)

Clear removes all handlers for events of type T

func ClearAll added in v0.5.0

func ClearAll(bus *EventBus)

ClearAll removes all handlers for all event types

func EventType added in v0.6.0

func EventType(event any) string

EventType returns the type name of an event. If the event implements TypeNamer, it returns the custom name. Otherwise, it returns the reflection-based package-qualified name.

This is useful for comparing with StoredEvent.Type during replay.

Example with reflection (default):

eventType := EventType(MyEvent{})
// Returns: "github.com/mypackage/MyEvent"

Example with TypeNamer:

type MyEvent struct{}
func (e MyEvent) EventTypeName() string { return "my-event.v1" }
eventType := EventType(MyEvent{})
// Returns: "my-event.v1"

Usage in replay:

bus.Replay(ctx, 0, func(event *StoredEvent) error {
    if event.Type == EventType(MyEvent{}) {
        // Process MyEvent
    }
    return nil
})

func HandlerCount added in v0.5.0

func HandlerCount[T any](bus *EventBus) int

HandlerCount returns the number of handlers for events of type T

func HasHandlers added in v0.5.0

func HasHandlers[T any](bus *EventBus) bool

HasHandlers checks if there are handlers for events of type T

func Publish

func Publish[T any](bus *EventBus, event T)

Publish publishes an event to all registered handlers

func PublishContext

func PublishContext[T any](bus *EventBus, ctx context.Context, event T)

PublishContext publishes an event with context to all registered handlers

func RegisterUpcast added in v0.8.0

func RegisterUpcast[From any, To any](bus *EventBus, upcast func(From) To) error

RegisterUpcast registers a type-safe upcast function

func RegisterUpcastFunc added in v0.8.0

func RegisterUpcastFunc(bus *EventBus, fromType, toType string, upcast UpcastFunc) error

RegisterUpcastFunc registers a raw upcast function for complex transformations

func Subscribe

func Subscribe[T any](bus *EventBus, handler Handler[T], opts ...SubscribeOption) error

Subscribe registers a handler for events of type T

func SubscribeContext

func SubscribeContext[T any](bus *EventBus, handler ContextHandler[T], opts ...SubscribeOption) error

SubscribeContext registers a context-aware handler for events of type T

func SubscribeWithReplay added in v0.3.0

func SubscribeWithReplay[T any](
	ctx context.Context,
	bus *EventBus,
	subscriptionID string,
	handler Handler[T],
	opts ...SubscribeOption,
) error

SubscribeWithReplay subscribes and replays missed events. Requires both an EventStore (for replay) and a SubscriptionStore (for tracking). If the store implements SubscriptionStore, it will be used automatically.

Context usage:

  • The context is used for the replay phase (loading historical events)
  • The context is used for saving subscription offsets
  • The context is NOT used for the live subscription handler (which follows the bus's lifecycle)

Note: If the saved offset is OffsetOldest (""), replay starts from the beginning. The OffsetNewest ("$") constant is typically not stored and is only used for live subscriptions.

func Unsubscribe

func Unsubscribe[T any, H any](bus *EventBus, handler H) error

Unsubscribe removes a handler for events of type T

Types

type ContextHandler

type ContextHandler[T any] func(context.Context, T)

ContextHandler is a generic event handler function that accepts context

type Event added in v0.8.4

type Event struct {
	Type      string          `json:"type"`
	Data      json.RawMessage `json:"data"`
	Timestamp time.Time       `json:"timestamp"`
}

Event represents an event to be stored (before it has an offset).

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus is a high-performance event bus with sharded locks

func New

func New(opts ...Option) *EventBus

New creates a new EventBus with sharded locks for better performance

func (*EventBus) ClearUpcasts added in v0.8.0

func (bus *EventBus) ClearUpcasts()

ClearUpcasts removes all registered upcasters

func (*EventBus) ClearUpcastsForType added in v0.8.0

func (bus *EventBus) ClearUpcastsForType(eventType string)

ClearUpcastsForType removes all upcasters for a specific source type

func (*EventBus) GetStore added in v0.3.0

func (bus *EventBus) GetStore() EventStore

GetStore returns the event store (or nil if not persistent)

func (*EventBus) IsPersistent added in v0.3.0

func (bus *EventBus) IsPersistent() bool

IsPersistent returns true if persistence is enabled

func (*EventBus) Replay added in v0.3.0

func (bus *EventBus) Replay(ctx context.Context, from Offset, handler func(*StoredEvent) error) error

Replay replays events from an offset

func (*EventBus) ReplayWithUpcast added in v0.8.0

func (bus *EventBus) ReplayWithUpcast(ctx context.Context, from Offset, handler func(*StoredEvent) error) error

ReplayWithUpcast replays events from an offset, applying upcasts before passing to handler

func (*EventBus) SetAfterPublishHook added in v0.2.0

func (bus *EventBus) SetAfterPublishHook(hook PublishHook)

SetAfterPublishHook sets the after publish hook (for backward compatibility)

func (*EventBus) SetBeforePublishHook added in v0.2.0

func (bus *EventBus) SetBeforePublishHook(hook PublishHook)

SetBeforePublishHook sets the before publish hook (for backward compatibility)

func (*EventBus) SetPanicHandler

func (bus *EventBus) SetPanicHandler(handler PanicHandler)

SetPanicHandler sets the panic handler (for backward compatibility)

func (*EventBus) SetPersistenceErrorHandler added in v0.8.0

func (bus *EventBus) SetPersistenceErrorHandler(handler PersistenceErrorHandler)

SetPersistenceErrorHandler sets the persistence error handler (for runtime configuration)

func (*EventBus) SetUpcastErrorHandler added in v0.8.0

func (bus *EventBus) SetUpcastErrorHandler(handler UpcastErrorHandler)

SetUpcastErrorHandler sets the upcast error handler at runtime

func (*EventBus) Shutdown added in v0.8.5

func (bus *EventBus) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the event bus, waiting for async handlers to complete. It respects the context timeout/cancellation. If the store implements io.Closer, its Close() method will be called after handlers complete.

func (*EventBus) Wait added in v0.5.0

func (bus *EventBus) Wait()

Wait blocks until all async handlers complete

type EventStore added in v0.3.0

type EventStore interface {
	// Append stores an event and returns its assigned offset.
	// The store is responsible for generating unique, monotonically increasing offsets.
	Append(ctx context.Context, event *Event) (Offset, error)

	// Read returns events starting after the given offset.
	// Use OffsetOldest to read from the beginning.
	// The limit parameter controls max events returned (0 = no limit).
	// Returns the events, the offset to use for the next read, and any error.
	Read(ctx context.Context, from Offset, limit int) ([]*StoredEvent, Offset, error)
}

EventStore defines the core interface for persisting events. This is a minimal interface with just 2 methods for basic event storage. Additional capabilities are provided through optional interfaces.

type EventStoreStreamer added in v0.9.2

type EventStoreStreamer interface {
	// ReadStream returns an iterator yielding events starting after the given offset.
	// Use OffsetOldest to read from the beginning.
	// The iterator checks ctx.Done() before each yield and returns ctx.Err() when cancelled.
	// A yielded error terminates iteration.
	ReadStream(ctx context.Context, from Offset) iter.Seq2[*StoredEvent, error]
}

EventStoreStreamer is an optional interface for memory-efficient streaming. When implemented, the Replay method will automatically use streaming.

Implementation notes:

  • Database-backed stores should use cursor-based iteration to minimize memory
  • In-memory stores may need to take a snapshot to avoid holding locks during iteration, trading memory for deadlock safety (see MemoryStore.ReadStream for an example)

type EventStoreSubscriber added in v0.10.0

type EventStoreSubscriber interface {
	// Subscribe starts a live subscription from the given offset.
	// Returns a channel that yields events as they arrive.
	// Call the returned cancel function to stop the subscription and close the channel.
	Subscribe(ctx context.Context, from Offset) (<-chan *StoredEvent, func(), error)
}

EventStoreSubscriber is an optional interface for stores that support live subscriptions. This enables real-time event streaming for remote stores.

type Handler

type Handler[T any] func(T)

Handler is a generic event handler function

type MemoryStore added in v0.3.0

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore is a simple in-memory implementation of EventStore and SubscriptionStore.

func NewMemoryStore added in v0.3.0

func NewMemoryStore() *MemoryStore

NewMemoryStore creates a new in-memory event store

func (*MemoryStore) Append added in v0.10.0

func (m *MemoryStore) Append(ctx context.Context, event *Event) (Offset, error)

Append implements EventStore

func (*MemoryStore) LoadOffset added in v0.10.0

func (m *MemoryStore) LoadOffset(ctx context.Context, subscriptionID string) (Offset, error)

LoadOffset implements SubscriptionStore

func (*MemoryStore) Read added in v0.10.0

func (m *MemoryStore) Read(ctx context.Context, from Offset, limit int) ([]*StoredEvent, Offset, error)

Read implements EventStore

func (*MemoryStore) ReadStream added in v0.10.0

func (m *MemoryStore) ReadStream(ctx context.Context, from Offset) iter.Seq2[*StoredEvent, error]

ReadStream implements EventStoreStreamer for memory-efficient event iteration. Note: This takes a filtered snapshot of matching events to avoid holding the lock during iteration, which could cause deadlocks if handlers call other store methods.

func (*MemoryStore) SaveOffset added in v0.10.0

func (m *MemoryStore) SaveOffset(ctx context.Context, subscriptionID string, offset Offset) error

SaveOffset implements SubscriptionStore

type Observability added in v0.8.3

type Observability interface {
	// OnPublishStart is called when an event is about to be published.
	// Returns a context that will be passed to handlers and subsequent hooks.
	// The event parameter allows implementations to extract custom attributes.
	OnPublishStart(ctx context.Context, eventType string, event any) context.Context

	// OnPublishComplete is called after all synchronous handlers complete.
	// Note: This is called before async handlers complete.
	OnPublishComplete(ctx context.Context, eventType string)

	// OnHandlerStart is called before a handler executes.
	// Returns a context for the handler execution.
	OnHandlerStart(ctx context.Context, eventType string, async bool) context.Context

	// OnHandlerComplete is called after a handler completes.
	// The error parameter is non-nil if the handler panicked.
	OnHandlerComplete(ctx context.Context, duration time.Duration, err error)

	// OnPersistStart is called before persisting an event.
	OnPersistStart(ctx context.Context, eventType string, position int64) context.Context

	// OnPersistComplete is called after persisting an event.
	OnPersistComplete(ctx context.Context, duration time.Duration, err error)
}

Observability is an optional interface for metrics and tracing. Implementations can track event publishing, handler execution, and errors.

This interface is designed to be zero-cost when not used - if no observability is configured, there is no performance overhead.

The context returned from each method can be used to propagate trace spans and other context-specific data through the event processing pipeline.

Example implementation: see github.com/jilio/ebu/otel package for OpenTelemetry integration.

type Offset added in v0.10.0

type Offset string

Offset represents an opaque position in an event stream. Implementations define the format (e.g., "123", "abc_456", timestamp-based). Offsets are lexicographically comparable within the same store.

const (
	// OffsetOldest represents the beginning of the stream.
	// When passed to Read, returns events from the start.
	OffsetOldest Offset = ""

	// OffsetNewest represents the current end of the stream.
	// Useful for subscribing to only new events.
	OffsetNewest Offset = "$"
)

type Option added in v0.5.0

type Option func(*EventBus)

Option is a function that configures the EventBus

func WithAfterPublish added in v0.5.0

func WithAfterPublish(hook PublishHook) Option

WithAfterPublish sets a hook that's called after publishing events

func WithAfterPublishContext added in v0.9.1

func WithAfterPublishContext(hook PublishHookContext) Option

WithAfterPublishContext sets a context-aware hook that's called after publishing events

func WithBeforePublish added in v0.5.0

func WithBeforePublish(hook PublishHook) Option

WithBeforePublish sets a hook that's called before publishing events

func WithBeforePublishContext added in v0.9.1

func WithBeforePublishContext(hook PublishHookContext) Option

WithBeforePublishContext sets a context-aware hook that's called before publishing events

func WithObservability added in v0.8.3

func WithObservability(obs Observability) Option

WithObservability sets the observability implementation for metrics and tracing

func WithPanicHandler added in v0.5.0

func WithPanicHandler(handler PanicHandler) Option

WithPanicHandler sets a panic handler for the event bus

func WithPersistenceErrorHandler added in v0.8.0

func WithPersistenceErrorHandler(handler PersistenceErrorHandler) Option

WithPersistenceErrorHandler sets the error handler for persistence failures

func WithPersistenceTimeout added in v0.8.0

func WithPersistenceTimeout(timeout time.Duration) Option

WithPersistenceTimeout sets the timeout for persistence operations

func WithReplayBatchSize added in v0.10.0

func WithReplayBatchSize(size int) Option

WithReplayBatchSize sets the batch size for Replay operations. This controls how many events are read at a time when using a store that doesn't implement EventStoreStreamer. Default is 100 if not set or set to 0.

func WithStore added in v0.3.0

func WithStore(store EventStore) Option

WithStore enables persistence with the given store

func WithSubscriptionStore added in v0.10.0

func WithSubscriptionStore(store SubscriptionStore) Option

WithSubscriptionStore enables subscription position tracking

func WithUpcast added in v0.8.0

func WithUpcast(fromType, toType string, upcast UpcastFunc) Option

WithUpcast adds an upcast function during bus creation

func WithUpcastErrorHandler added in v0.8.0

func WithUpcastErrorHandler(handler UpcastErrorHandler) Option

WithUpcastErrorHandler sets the error handler for upcast failures

type PanicHandler

type PanicHandler func(event any, handlerType reflect.Type, panicValue any)

PanicHandler is called when a handler panics

type PersistenceErrorHandler added in v0.8.0

type PersistenceErrorHandler func(event any, eventType reflect.Type, err error)

PersistenceErrorHandler is called when event persistence fails

type PublishHook added in v0.2.0

type PublishHook func(eventType reflect.Type, event any)

PublishHook is called when an event is published

type PublishHookContext added in v0.9.1

type PublishHookContext func(ctx context.Context, eventType reflect.Type, event any)

PublishHookContext is called when an event is published (with context)

type StoredEvent added in v0.3.0

type StoredEvent struct {
	Offset    Offset          `json:"offset"`
	Type      string          `json:"type"`
	Data      json.RawMessage `json:"data"`
	Timestamp time.Time       `json:"timestamp"`
}

StoredEvent represents an event that has been persisted with an offset.

type SubscribeOption

type SubscribeOption func(*internalHandler)

SubscribeOption configures a subscription

func Async

func Async() SubscribeOption

Async makes the handler execute asynchronously

func Once

func Once() SubscribeOption

Once makes the handler execute only once

func Sequential added in v0.5.0

func Sequential() SubscribeOption

Sequential ensures the handler executes sequentially (with mutex)

func WithFilter added in v0.7.0

func WithFilter[T any](predicate func(T) bool) SubscribeOption

WithFilter configures the handler to only receive events that match the predicate

type SubscriptionStore added in v0.10.0

type SubscriptionStore interface {
	// SaveOffset persists the current offset for a subscription.
	SaveOffset(ctx context.Context, subscriptionID string, offset Offset) error

	// LoadOffset retrieves the last saved offset for a subscription.
	// Returns OffsetOldest if the subscription has no saved offset.
	LoadOffset(ctx context.Context, subscriptionID string) (Offset, error)
}

SubscriptionStore tracks subscription progress separately from event storage. This interface is optional and enables resumable subscriptions.

type TypeNamer added in v0.8.2

type TypeNamer interface {
	EventTypeName() string
}

TypeNamer is an optional interface that events can implement to provide their own type name. This gives explicit control over event type naming, which is useful for:

  • Stable type names across package refactoring
  • Custom versioning schemes (e.g., "UserCreated.v2")
  • Compatibility with external event stores

If an event implements TypeNamer, EventType() will use the provided name instead of the reflection-based package-qualified name.

Example:

type UserCreatedEvent struct {
    UserID string
}

func (e UserCreatedEvent) EventTypeName() string {
    return "user.created.v1"
}

type UpcastErrorHandler added in v0.8.0

type UpcastErrorHandler func(eventType string, data json.RawMessage, err error)

UpcastErrorHandler is called when an upcast operation fails

type UpcastFunc added in v0.8.0

type UpcastFunc func(data json.RawMessage) (json.RawMessage, string, error)

UpcastFunc transforms event data from one version to another. It receives the raw JSON data and returns transformed data with the new type name.

type Upcaster added in v0.8.0

type Upcaster struct {
	FromType string     // Source event type
	ToType   string     // Target event type
	Upcast   UpcastFunc // Transformation function
}

Upcaster represents a transformation from one event type to another

Directories

Path Synopsis
otel module
Package state implements the Durable Streams State Protocol for ebu.
Package state implements the Durable Streams State Protocol for ebu.
stores
durablestream module
sqlite module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL