processing

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PIPELINE_OPERATOR = "PIPELINE_OPERATOR"
	FILTER_OPERATOR   = "FILTER_OPERATOR"
	MAP_OPERATOR      = "MAP_OPERATOR"
	FANIN_OPERATOR    = "FANIN_OPERATOR" // For homogeneous multi-stream operations
	JOIN_OPERATOR     = "JOIN_OPERATOR"  // For heterogeneous 2-stream joins
)

Variables

View Source
var (
	ErrJoinRequiresTwoInputs     = errors.New("Join operator requires exactly two input streams")
	ErrLeftJoinRequiresTwoInputs = errors.New("LeftJoin operator requires exactly two input streams")
)
View Source
var (
	ErrPipelineOperatorInputOutput = errors.New("pipeline operator needs exactly 1 input and at least 1 output")
	ErrInvalidPipelineOperation    = errors.New("invalid operation type for pipeline operator, expected func([]events.Event[TIn]) []Tout")
	ErrFilterOperatorInputOutput   = errors.New("filter operator needs exactly 1 input and at least 1 output")
	ErrInvalidFilterPredicate      = errors.New("invalid predicate type for filter operator, expected func(events.Event[TIn]) bool")
	ErrUnknownOperatorType         = errors.New("unknown operator type")
	ErrMapOperatorInputOutput      = errors.New("map operator needs exactly 1 input and at least 1 output")
	ErrInvalidMapOperation         = errors.New("invalid operation type for map operator, expected func(events.Event[TIn]) Tout")
	ErrNilOperator                 = errors.New("operator is considered nil (either id or operator is nil)")
	ErrOperatorAlreadyExists       = errors.New("operator already exists")
	ErrFanInOperatorInputOutput    = errors.New("fan-in operator needs at least 2 inputs and at least 1 output")
	ErrInvalidFanInOperation       = errors.New("invalid operation type for fan-in operator, expected func(map[int][]events.Event[TIn]) []Tout")
)
View Source
var (
	ErrNilStream       = errors.New("builder: stream cannot be nil")
	ErrOutputUndefined = errors.New("builder: output undefined")
	ErrEmptyInput      = errors.New("builder: operator requires at least one input stream")
	ErrAmbiguousOutput = errors.New("builder: query results in multiple output streams, cannot determine main output")
)
View Source
var (
	ErrQueryNil        = errors.New("query: query cannot be nil")
	ErrInvalidCallback = errors.New("query: callback cannot be nil and needs to implement func(event events.Event[T])")
)
View Source
var (
	ErrInvalidQueryID     = errors.New("query: invalid query ID")
	ErrQueryAlreadyExists = errors.New("query: query with this ID already exists")
)

Functions

func BatchCount

func BatchCount[TEvent any, TOut number](policy events.PolicyDescription) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)

BatchCount creates a query that counts events over a window defined by the selection policy.

func BatchSum

func BatchSum[TEvent number](policy events.PolicyDescription) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)

BatchSum creates a query that sums numeric events over a window defined by the selection policy.

func Close

func Close(qs ContinuousQuery) error

Close stops the query and unsubscribes the output receiver.

func Convert

func Convert[TIn, TOut number]() func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)

Convert creates a query that converts events from one numeric type to another.

func Filter

func Filter[T any](predicate func(events.Event[T]) bool) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)

Filter creates a query that filters events based on a provided predicate.

func FlatMap

func FlatMap[TIn, TOut any](mapper func(events.Event[TIn]) []TOut) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)

FlatMap creates a query that maps one input event to zero or more output events.

func FromSourceStream deprecated

func FromSourceStream[T any](topic string, options ...pubsub.StreamOption) func(q ContinuousQuery) StreamWError

Deprecated: Use NewBuilder() and Builder.From() instead.

func Greater

func Greater[T number](greaterThan T) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)

Greater creates a query that filters events greater than a specified value.

func Join

func Join(
	key string,
	policy events.PolicyDescription,
) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)

Join creates a join operator that performs an inner join on two streams of maps. It joins on the provided key and merges the two event maps.

func LeftJoin

func LeftJoin(
	key string,
	policy events.PolicyDescription,
) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)

LeftJoin creates a join operator that performs a left outer join on two streams of maps.

func Map

func Map[TIn, TOut any](mapper func(events.Event[TIn]) TOut) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)

Map creates a query that maps events from one type to another using a provided mapper function.

func Observe

func Observe[T any](callback func(events.Event[T])) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)

Observe creates an operator that executes a side-effect for each event but passes it through unchanged.

func Process deprecated

func Process[T any](
	operatorCreationFunc func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error),
	fromF func(q ContinuousQuery) StreamWError,
	options ...pubsub.StreamOption,
) func(q ContinuousQuery) StreamWError

Deprecated: Use NewBuilder() and Builder.Process() instead.

func RemoveOperator

func RemoveOperator(oid OperatorID) error

func SelectFromMap

func SelectFromMap(key string) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)

SelectFromMap creates an operator that selects a value from a map by key and forwards it. The input stream must contain events of type `map[string]any`. The output will be of type `any`. If the key is not found, the operator forwards an event with nil content.

func Smaller

func Smaller[T number](than T) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)

Smaller creates a query that filters events smaller than a specified value.

Types

type Builder

type Builder struct {
	// contains filtered or unexported fields
}

Builder provides an API to construct ContinuousQueries.

func NewBuilder

func NewBuilder[T any](opts ...QueryOption) *Builder

NewBuilder creates a new query builder.

func (*Builder) AddInput

func (b *Builder) AddInput(createFunc StreamCreationOptions) *Builder

AddInput adds a stream to the query. It is an alias for From.

func (*Builder) Build

func (b *Builder) Build(run bool) (ContinuousQuery, error)

Build constructs the TypedContinuousQuery.

func (*Builder) From

func (b *Builder) From(createFunc StreamCreationOptions) *Builder

From adds a stream to the query.

func (*Builder) Merge

func (b *Builder) Merge(b2 *Builder) *Builder

Merge merges the state of another builder into this one.

func (*Builder) Process

func (b *Builder) Process(operatorFunc OperatorCreationOptions) *Builder

Process adds an operator to the query

type ContinuousQuery

type ContinuousQuery interface {
	ID() ID

	Run() error
	Subscribe(callback any, options ...pubsub.SubscriberOption) error
	// contains filtered or unexported methods
}

ContinuousQuery represents a running query that processes streams.

func Query deprecated

func Query[T any](
	fromF func(q ContinuousQuery) StreamWError,
	opts ...QueryOption,
) (q ContinuousQuery, err error)

Deprecated: Use NewBuilder() instead.

type CreateOperatorFunc

type CreateOperatorFunc func([]pubsub.StreamID, []pubsub.StreamID, OperatorID) (OperatorID, error)

type CreateStreamFunc

type CreateStreamFunc func(repo *pubsub.StreamRepository) (pubsub.StreamID, error)

CreateStreamFunc encapsulates the creation logic for a source stream.

type FanInOperatorEngine

type FanInOperatorEngine[TIn any, TOut any] struct {
	// contains filtered or unexported fields
}

func (*FanInOperatorEngine) ID

func (o *FanInOperatorEngine) ID() OperatorID

func (*FanInOperatorEngine) InStream

func (o *FanInOperatorEngine) InStream(in pubsub.StreamID, p events.PolicyDescription)

func (*FanInOperatorEngine) OutStream

func (o *FanInOperatorEngine) OutStream(to pubsub.StreamID)

func (*FanInOperatorEngine[TIn, TOut]) Start

func (o *FanInOperatorEngine[TIn, TOut]) Start() error

func (*FanInOperatorEngine[TIn, TOut]) Stop

func (o *FanInOperatorEngine[TIn, TOut]) Stop() error

type FilterOperatorEngine

type FilterOperatorEngine[TIN any] struct {
	// contains filtered or unexported fields
}

func (*FilterOperatorEngine) ID

func (o *FilterOperatorEngine) ID() OperatorID

func (*FilterOperatorEngine) InStream

func (o *FilterOperatorEngine) InStream(in pubsub.StreamID, p events.PolicyDescription)

func (*FilterOperatorEngine) OutStream

func (o *FilterOperatorEngine) OutStream(to pubsub.StreamID)

func (*FilterOperatorEngine[TIN]) Process

func (o *FilterOperatorEngine[TIN]) Process(in ...events.Event[TIN])

func (*FilterOperatorEngine[TIN]) Start

func (o *FilterOperatorEngine[TIN]) Start() error

func (*FilterOperatorEngine) Stop

func (o *FilterOperatorEngine) Stop() error

type ID

type ID uuid.UUID

ID uniquely identifies a query in the repository.

func (ID) String

func (id ID) String() string

String returns the string representation of the ID.

type InputConfig

type InputConfig struct {
	Stream      pubsub.StreamID          `yaml:"stream" json:"stream"`
	InputPolicy events.PolicyDescription `yaml:"policy" json:"policy"`
}

func MakeInputConfigs

func MakeInputConfigs(in []pubsub.StreamID, policy events.PolicyDescription) []InputConfig

type JoinOperatorEngine

type JoinOperatorEngine[TLeft, TRight, TOut any] struct {
	// contains filtered or unexported fields
}

JoinOperatorEngine is a blueprint for a specialized engine to join two streams of potentially different types.

func (*JoinOperatorEngine[TLeft, TRight, TOut]) ID

func (o *JoinOperatorEngine[TLeft, TRight, TOut]) ID() OperatorID

func (*JoinOperatorEngine[TLeft, TRight, TOut]) InStream

func (o *JoinOperatorEngine[TLeft, TRight, TOut]) InStream(from pubsub.StreamID, description events.PolicyDescription)

func (*JoinOperatorEngine[TLeft, TRight, TOut]) OutStream

func (o *JoinOperatorEngine[TLeft, TRight, TOut]) OutStream(to pubsub.StreamID)

func (*JoinOperatorEngine[TLeft, TRight, TOut]) Start

func (o *JoinOperatorEngine[TLeft, TRight, TOut]) Start() error

func (*JoinOperatorEngine[TLeft, TRight, TOut]) Stop

func (o *JoinOperatorEngine[TLeft, TRight, TOut]) Stop() error

type MapOperatorEngine

type MapOperatorEngine[TIN any, TOUT any] struct {
	// contains filtered or unexported fields
}

func (*MapOperatorEngine) ID

func (o *MapOperatorEngine) ID() OperatorID

func (*MapOperatorEngine) InStream

func (o *MapOperatorEngine) InStream(in pubsub.StreamID, p events.PolicyDescription)

func (*MapOperatorEngine) OutStream

func (o *MapOperatorEngine) OutStream(to pubsub.StreamID)

func (*MapOperatorEngine[TIN, TOUT]) Process

func (o *MapOperatorEngine[TIN, TOUT]) Process(in ...events.Event[TIN])

func (*MapOperatorEngine[TIN, TOUT]) ProcessSingleEvent

func (o *MapOperatorEngine[TIN, TOUT]) ProcessSingleEvent(event events.Event[TIN])

func (*MapOperatorEngine[TIN, TOUT]) Start

func (o *MapOperatorEngine[TIN, TOUT]) Start() error

func (*MapOperatorEngine) Stop

func (o *MapOperatorEngine) Stop() error

type ORepository

type ORepository interface {
	Get(id OperatorID) (OperatorEngine, bool)

	List() map[OperatorID]OperatorEngine
	// contains filtered or unexported methods
}

func OperatorRepository

func OperatorRepository() ORepository

type OperatorConfig

type OperatorConfig struct {
	AutoStart bool              `yaml:"auto_start" json:"auto_start"`
	Type      string            `yaml:"type" json:"type"`
	ID        OperatorID        `yaml:"id" json:"id"`
	Inputs    []InputConfig     `yaml:"inputs" json:"inputs"`
	Outputs   []pubsub.StreamID `yaml:"outputs" json:"outputs"`
}

func MakeOperatorConfig

func MakeOperatorConfig(operatorType string, opts ...OperatorOption) OperatorConfig

type OperatorCreationOptions

type OperatorCreationOptions struct {
	// contains filtered or unexported fields
}

func CreateFanOutStream

func CreateFanOutStream[TOut any](operatorCreateFunc CreateOperatorFunc, numOutputs int, opts ...pubsub.StreamOption) OperatorCreationOptions

func Operator

func Operator[TOut any](operatorCreateFunc CreateOperatorFunc, opts ...pubsub.StreamOption) OperatorCreationOptions

type OperatorEngine

type OperatorEngine interface {
	ID() OperatorID
	Start() error
	Stop() error
	InStream(from pubsub.StreamID, description events.PolicyDescription)
	OutStream(to pubsub.StreamID)
}

type OperatorID

type OperatorID uuid.UUID

func NewJoinOperator

func NewJoinOperator[TLeft, TRight, TOut any](
	config OperatorConfig,
	joinFunc func(left []events.Event[TLeft], right []events.Event[TRight]) []TOut,
) (OperatorID, error)

NewJoinOperator is a blueprint for a factory function to create a heterogeneous JoinOperatorEngine.

func NewOperator

func NewOperator[TIn, Tout any](operation any, d OperatorConfig, id OperatorID) (OperatorID, error)

func NewOperatorID

func NewOperatorID() OperatorID

func NilOperatorID

func NilOperatorID() OperatorID

func (OperatorID) String

func (o OperatorID) String() string

type OperatorOption

type OperatorOption func(*OperatorConfig)

func WithAutoStart

func WithAutoStart(auto bool) OperatorOption

func WithInput

func WithInput(inputs ...InputConfig) OperatorOption

func WithOutput

func WithOutput(topics ...pubsub.StreamID) OperatorOption

type PipelineOperatorEngine

type PipelineOperatorEngine[TIN any, TOUT any] struct {
	// contains filtered or unexported fields
}

func (*PipelineOperatorEngine) ID

func (o *PipelineOperatorEngine) ID() OperatorID

func (*PipelineOperatorEngine) InStream

func (o *PipelineOperatorEngine) InStream(in pubsub.StreamID, p events.PolicyDescription)

func (*PipelineOperatorEngine) OutStream

func (o *PipelineOperatorEngine) OutStream(to pubsub.StreamID)

func (*PipelineOperatorEngine[TIN, TOUT]) Process

func (o *PipelineOperatorEngine[TIN, TOUT]) Process(in ...events.Event[TIN])

func (*PipelineOperatorEngine[TIN, TOUT]) Start

func (o *PipelineOperatorEngine[TIN, TOUT]) Start() error

func (*PipelineOperatorEngine) Stop

func (o *PipelineOperatorEngine) Stop() error

type QueryOption

type QueryOption func(*queryOptions)

func WithNewRepository

func WithNewRepository() QueryOption

func WithRepository

func WithRepository(r *pubsub.StreamRepository) QueryOption

type Repository

type Repository interface {
	fmt.Stringer
	Get(id ID) (ContinuousQuery, bool)

	List() map[ID]ContinuousQuery
	// contains filtered or unexported methods
}

Repository manages the storage and retrieval of ContinuousQueries.

func QueryRepository

func QueryRepository() Repository

QueryRepository returns the singleton instance of the query repository.

type StreamCreationOptions

type StreamCreationOptions struct {
	// contains filtered or unexported fields
}

func Source

func Source[T any](topic string, opts ...pubsub.StreamOption) StreamCreationOptions

Source creates a StreamDef for a source stream of type T.

type StreamWError deprecated

type StreamWError struct {
	// contains filtered or unexported fields
}

Deprecated: Use NewBuilder() instead.

func OnStream deprecated

func OnStream[T any](stream StreamWError) StreamWError

Deprecated: Use NewBuilder() instead.

type TypedContinuousQuery

type TypedContinuousQuery[T any] struct {
	// contains filtered or unexported fields
}

TypedContinuousQuery is a typed wrapper around ContinuousQuery that provides a typed output receiver.

func (*TypedContinuousQuery[T]) ID

func (c *TypedContinuousQuery[T]) ID() ID

ID returns the unique identifier of the query.

func (*TypedContinuousQuery[T]) Run

func (c *TypedContinuousQuery[T]) Run() error

func (*TypedContinuousQuery[T]) Subscribe

func (c *TypedContinuousQuery[T]) Subscribe(
	callback any,
	options ...pubsub.SubscriberOption,
) error

type TypedOperatorExecutor

type TypedOperatorExecutor[TIn any] interface {
	OperatorEngine
	Process(event ...events.Event[TIn])
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL