Documentation
¶
Index ¶
- Constants
- Variables
- func BatchCount[TEvent any, TOut number](policy events.PolicyDescription) ...
- func BatchSum[TEvent number](policy events.PolicyDescription) ...
- func Close(qs ContinuousQuery) error
- func Convert[TIn, TOut number]() ...
- func Filter[T any](predicate func(events.Event[T]) bool) ...
- func FlatMap[TIn, TOut any](mapper func(events.Event[TIn]) []TOut) ...
- func FromSourceStream[T any](topic string, options ...pubsub.StreamOption) func(q ContinuousQuery) StreamWErrordeprecated
- func Greater[T number](greaterThan T) ...
- func Join(key string, policy events.PolicyDescription) ...
- func LeftJoin(key string, policy events.PolicyDescription) ...
- func Map[TIn, TOut any](mapper func(events.Event[TIn]) TOut) ...
- func Observe[T any](callback func(events.Event[T])) ...
- func Process[T any](...) func(q ContinuousQuery) StreamWErrordeprecated
- func RemoveOperator(oid OperatorID) error
- func SelectFromMap(key string) ...
- func Smaller[T number](than T) ...
- type Builder
- func (b *Builder) AddInput(createFunc StreamCreationOptions) *Builder
- func (b *Builder) Build(run bool) (ContinuousQuery, error)
- func (b *Builder) From(createFunc StreamCreationOptions) *Builder
- func (b *Builder) Merge(b2 *Builder) *Builder
- func (b *Builder) Process(operatorFunc OperatorCreationOptions) *Builder
- type ContinuousQuery
- type CreateOperatorFunc
- type CreateStreamFunc
- type FanInOperatorEngine
- func (o *FanInOperatorEngine) ID() OperatorID
- func (o *FanInOperatorEngine) InStream(in pubsub.StreamID, p events.PolicyDescription)
- func (o *FanInOperatorEngine) OutStream(to pubsub.StreamID)
- func (o *FanInOperatorEngine[TIn, TOut]) Start() error
- func (o *FanInOperatorEngine[TIn, TOut]) Stop() error
- type FilterOperatorEngine
- func (o *FilterOperatorEngine) ID() OperatorID
- func (o *FilterOperatorEngine) InStream(in pubsub.StreamID, p events.PolicyDescription)
- func (o *FilterOperatorEngine) OutStream(to pubsub.StreamID)
- func (o *FilterOperatorEngine[TIN]) Process(in ...events.Event[TIN])
- func (o *FilterOperatorEngine[TIN]) Start() error
- func (o *FilterOperatorEngine) Stop() error
- type ID
- type InputConfig
- type JoinOperatorEngine
- func (o *JoinOperatorEngine[TLeft, TRight, TOut]) ID() OperatorID
- func (o *JoinOperatorEngine[TLeft, TRight, TOut]) InStream(from pubsub.StreamID, description events.PolicyDescription)
- func (o *JoinOperatorEngine[TLeft, TRight, TOut]) OutStream(to pubsub.StreamID)
- func (o *JoinOperatorEngine[TLeft, TRight, TOut]) Start() error
- func (o *JoinOperatorEngine[TLeft, TRight, TOut]) Stop() error
- type MapOperatorEngine
- func (o *MapOperatorEngine) ID() OperatorID
- func (o *MapOperatorEngine) InStream(in pubsub.StreamID, p events.PolicyDescription)
- func (o *MapOperatorEngine) OutStream(to pubsub.StreamID)
- func (o *MapOperatorEngine[TIN, TOUT]) Process(in ...events.Event[TIN])
- func (o *MapOperatorEngine[TIN, TOUT]) ProcessSingleEvent(event events.Event[TIN])
- func (o *MapOperatorEngine[TIN, TOUT]) Start() error
- func (o *MapOperatorEngine) Stop() error
- type ORepository
- type OperatorConfig
- type OperatorCreationOptions
- type OperatorEngine
- type OperatorID
- type OperatorOption
- type PipelineOperatorEngine
- func (o *PipelineOperatorEngine) ID() OperatorID
- func (o *PipelineOperatorEngine) InStream(in pubsub.StreamID, p events.PolicyDescription)
- func (o *PipelineOperatorEngine) OutStream(to pubsub.StreamID)
- func (o *PipelineOperatorEngine[TIN, TOUT]) Process(in ...events.Event[TIN])
- func (o *PipelineOperatorEngine[TIN, TOUT]) Start() error
- func (o *PipelineOperatorEngine) Stop() error
- type QueryOption
- type Repository
- type StreamCreationOptions
- type StreamWErrordeprecated
- type TypedContinuousQuery
- type TypedOperatorExecutor
Constants ¶
const ( PIPELINE_OPERATOR = "PIPELINE_OPERATOR" FILTER_OPERATOR = "FILTER_OPERATOR" MAP_OPERATOR = "MAP_OPERATOR" FANIN_OPERATOR = "FANIN_OPERATOR" // For homogeneous multi-stream operations JOIN_OPERATOR = "JOIN_OPERATOR" // For heterogeneous 2-stream joins )
Variables ¶
var ( ErrJoinRequiresTwoInputs = errors.New("Join operator requires exactly two input streams") ErrLeftJoinRequiresTwoInputs = errors.New("LeftJoin operator requires exactly two input streams") )
var ( ErrPipelineOperatorInputOutput = errors.New("pipeline operator needs exactly 1 input and at least 1 output") ErrInvalidPipelineOperation = errors.New("invalid operation type for pipeline operator, expected func([]events.Event[TIn]) []Tout") ErrFilterOperatorInputOutput = errors.New("filter operator needs exactly 1 input and at least 1 output") ErrInvalidFilterPredicate = errors.New("invalid predicate type for filter operator, expected func(events.Event[TIn]) bool") ErrUnknownOperatorType = errors.New("unknown operator type") ErrMapOperatorInputOutput = errors.New("map operator needs exactly 1 input and at least 1 output") ErrInvalidMapOperation = errors.New("invalid operation type for map operator, expected func(events.Event[TIn]) Tout") ErrNilOperator = errors.New("operator is considered nil (either id or operator is nil)") ErrOperatorAlreadyExists = errors.New("operator already exists") ErrFanInOperatorInputOutput = errors.New("fan-in operator needs at least 2 inputs and at least 1 output") ErrInvalidFanInOperation = errors.New("invalid operation type for fan-in operator, expected func(map[int][]events.Event[TIn]) []Tout") )
var ( ErrNilStream = errors.New("builder: stream cannot be nil") ErrOutputUndefined = errors.New("builder: output undefined") ErrEmptyInput = errors.New("builder: operator requires at least one input stream") ErrAmbiguousOutput = errors.New("builder: query results in multiple output streams, cannot determine main output") )
var ( ErrQueryNil = errors.New("query: query cannot be nil") ErrInvalidCallback = errors.New("query: callback cannot be nil and needs to implement func(event events.Event[T])") )
var ( ErrInvalidQueryID = errors.New("query: invalid query ID") ErrQueryAlreadyExists = errors.New("query: query with this ID already exists") )
Functions ¶
func BatchCount ¶
func BatchCount[TEvent any, TOut number](policy events.PolicyDescription) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)
BatchCount creates a query that counts events over a window defined by the selection policy.
func BatchSum ¶
func BatchSum[TEvent number](policy events.PolicyDescription) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)
BatchSum creates a query that sums numeric events over a window defined by the selection policy.
func Close ¶
func Close(qs ContinuousQuery) error
Close stops the query and unsubscribes the output receiver.
func Convert ¶
func Convert[TIn, TOut number]() func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)
Convert creates a query that converts events from one numeric type to another.
func Filter ¶
func Filter[T any](predicate func(events.Event[T]) bool) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)
Filter creates a query that filters events based on a provided predicate.
func FlatMap ¶
func FlatMap[TIn, TOut any](mapper func(events.Event[TIn]) []TOut) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)
FlatMap creates a query that maps one input event to zero or more output events.
func FromSourceStream
deprecated
func FromSourceStream[T any](topic string, options ...pubsub.StreamOption) func(q ContinuousQuery) StreamWError
Deprecated: Use NewBuilder() and Builder.From() instead.
func Greater ¶
func Greater[T number](greaterThan T) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)
Greater creates a query that filters events greater than a specified value.
func Join ¶
func Join( key string, policy events.PolicyDescription, ) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)
Join creates a join operator that performs an inner join on two streams of maps. It joins on the provided key and merges the two event maps.
func LeftJoin ¶
func LeftJoin( key string, policy events.PolicyDescription, ) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)
LeftJoin creates a join operator that performs a left outer join on two streams of maps.
func Map ¶
func Map[TIn, TOut any](mapper func(events.Event[TIn]) TOut) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)
Map creates a query that maps events from one type to another using a provided mapper function.
func Observe ¶
func Observe[T any](callback func(events.Event[T])) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)
Observe creates an operator that executes a side-effect for each event but passes it through unchanged.
func Process
deprecated
func Process[T any]( operatorCreationFunc func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error), fromF func(q ContinuousQuery) StreamWError, options ...pubsub.StreamOption, ) func(q ContinuousQuery) StreamWError
Deprecated: Use NewBuilder() and Builder.Process() instead.
func RemoveOperator ¶
func RemoveOperator(oid OperatorID) error
func SelectFromMap ¶
func SelectFromMap(key string) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)
SelectFromMap creates an operator that selects a value from a map by key and forwards it. The input stream must contain events of type `map[string]any`. The output will be of type `any`. If the key is not found, the operator forwards an event with nil content.
func Smaller ¶
func Smaller[T number](than T) func(in []pubsub.StreamID, out []pubsub.StreamID, id OperatorID) (OperatorID, error)
Smaller creates a query that filters events smaller than a specified value.
Types ¶
type Builder ¶
type Builder struct {
// contains filtered or unexported fields
}
Builder provides an API to construct ContinuousQueries.
func NewBuilder ¶
func NewBuilder[T any](opts ...QueryOption) *Builder
NewBuilder creates a new query builder.
func (*Builder) AddInput ¶
func (b *Builder) AddInput(createFunc StreamCreationOptions) *Builder
AddInput adds a stream to the query. It is an alias for From.
func (*Builder) Build ¶
func (b *Builder) Build(run bool) (ContinuousQuery, error)
Build constructs the TypedContinuousQuery.
func (*Builder) From ¶
func (b *Builder) From(createFunc StreamCreationOptions) *Builder
From adds a stream to the query.
func (*Builder) Process ¶
func (b *Builder) Process(operatorFunc OperatorCreationOptions) *Builder
Process adds an operator to the query
type ContinuousQuery ¶
type ContinuousQuery interface {
ID() ID
Run() error
Subscribe(callback any, options ...pubsub.SubscriberOption) error
// contains filtered or unexported methods
}
ContinuousQuery represents a running query that processes streams.
func Query
deprecated
func Query[T any]( fromF func(q ContinuousQuery) StreamWError, opts ...QueryOption, ) (q ContinuousQuery, err error)
Deprecated: Use NewBuilder() instead.
type CreateOperatorFunc ¶
type CreateOperatorFunc func([]pubsub.StreamID, []pubsub.StreamID, OperatorID) (OperatorID, error)
type CreateStreamFunc ¶
type CreateStreamFunc func(repo *pubsub.StreamRepository) (pubsub.StreamID, error)
CreateStreamFunc encapsulates the creation logic for a source stream.
type FanInOperatorEngine ¶
func (*FanInOperatorEngine) ID ¶
func (o *FanInOperatorEngine) ID() OperatorID
func (*FanInOperatorEngine) InStream ¶
func (o *FanInOperatorEngine) InStream(in pubsub.StreamID, p events.PolicyDescription)
func (*FanInOperatorEngine[TIn, TOut]) Start ¶
func (o *FanInOperatorEngine[TIn, TOut]) Start() error
func (*FanInOperatorEngine[TIn, TOut]) Stop ¶
func (o *FanInOperatorEngine[TIn, TOut]) Stop() error
type FilterOperatorEngine ¶
type FilterOperatorEngine[TIN any] struct { // contains filtered or unexported fields }
func (*FilterOperatorEngine) ID ¶
func (o *FilterOperatorEngine) ID() OperatorID
func (*FilterOperatorEngine) InStream ¶
func (o *FilterOperatorEngine) InStream(in pubsub.StreamID, p events.PolicyDescription)
func (*FilterOperatorEngine[TIN]) Process ¶
func (o *FilterOperatorEngine[TIN]) Process(in ...events.Event[TIN])
func (*FilterOperatorEngine[TIN]) Start ¶
func (o *FilterOperatorEngine[TIN]) Start() error
type InputConfig ¶
type InputConfig struct {
Stream pubsub.StreamID `yaml:"stream" json:"stream"`
InputPolicy events.PolicyDescription `yaml:"policy" json:"policy"`
}
func MakeInputConfigs ¶
func MakeInputConfigs(in []pubsub.StreamID, policy events.PolicyDescription) []InputConfig
type JoinOperatorEngine ¶
type JoinOperatorEngine[TLeft, TRight, TOut any] struct { // contains filtered or unexported fields }
JoinOperatorEngine is a blueprint for a specialized engine to join two streams of potentially different types.
func (*JoinOperatorEngine[TLeft, TRight, TOut]) ID ¶
func (o *JoinOperatorEngine[TLeft, TRight, TOut]) ID() OperatorID
func (*JoinOperatorEngine[TLeft, TRight, TOut]) InStream ¶
func (o *JoinOperatorEngine[TLeft, TRight, TOut]) InStream(from pubsub.StreamID, description events.PolicyDescription)
func (*JoinOperatorEngine[TLeft, TRight, TOut]) OutStream ¶
func (o *JoinOperatorEngine[TLeft, TRight, TOut]) OutStream(to pubsub.StreamID)
func (*JoinOperatorEngine[TLeft, TRight, TOut]) Start ¶
func (o *JoinOperatorEngine[TLeft, TRight, TOut]) Start() error
func (*JoinOperatorEngine[TLeft, TRight, TOut]) Stop ¶
func (o *JoinOperatorEngine[TLeft, TRight, TOut]) Stop() error
type MapOperatorEngine ¶
func (*MapOperatorEngine) ID ¶
func (o *MapOperatorEngine) ID() OperatorID
func (*MapOperatorEngine) InStream ¶
func (o *MapOperatorEngine) InStream(in pubsub.StreamID, p events.PolicyDescription)
func (*MapOperatorEngine[TIN, TOUT]) Process ¶
func (o *MapOperatorEngine[TIN, TOUT]) Process(in ...events.Event[TIN])
func (*MapOperatorEngine[TIN, TOUT]) ProcessSingleEvent ¶
func (o *MapOperatorEngine[TIN, TOUT]) ProcessSingleEvent(event events.Event[TIN])
func (*MapOperatorEngine[TIN, TOUT]) Start ¶
func (o *MapOperatorEngine[TIN, TOUT]) Start() error
type ORepository ¶
type ORepository interface {
Get(id OperatorID) (OperatorEngine, bool)
List() map[OperatorID]OperatorEngine
// contains filtered or unexported methods
}
func OperatorRepository ¶
func OperatorRepository() ORepository
type OperatorConfig ¶
type OperatorConfig struct {
AutoStart bool `yaml:"auto_start" json:"auto_start"`
Type string `yaml:"type" json:"type"`
ID OperatorID `yaml:"id" json:"id"`
Inputs []InputConfig `yaml:"inputs" json:"inputs"`
Outputs []pubsub.StreamID `yaml:"outputs" json:"outputs"`
}
func MakeOperatorConfig ¶
func MakeOperatorConfig(operatorType string, opts ...OperatorOption) OperatorConfig
type OperatorCreationOptions ¶
type OperatorCreationOptions struct {
// contains filtered or unexported fields
}
func CreateFanOutStream ¶
func CreateFanOutStream[TOut any](operatorCreateFunc CreateOperatorFunc, numOutputs int, opts ...pubsub.StreamOption) OperatorCreationOptions
func Operator ¶
func Operator[TOut any](operatorCreateFunc CreateOperatorFunc, opts ...pubsub.StreamOption) OperatorCreationOptions
type OperatorEngine ¶
type OperatorEngine interface {
ID() OperatorID
Start() error
Stop() error
InStream(from pubsub.StreamID, description events.PolicyDescription)
OutStream(to pubsub.StreamID)
}
type OperatorID ¶
func NewJoinOperator ¶
func NewJoinOperator[TLeft, TRight, TOut any]( config OperatorConfig, joinFunc func(left []events.Event[TLeft], right []events.Event[TRight]) []TOut, ) (OperatorID, error)
NewJoinOperator is a blueprint for a factory function to create a heterogeneous JoinOperatorEngine.
func NewOperator ¶
func NewOperator[TIn, Tout any](operation any, d OperatorConfig, id OperatorID) (OperatorID, error)
func NewOperatorID ¶
func NewOperatorID() OperatorID
func NilOperatorID ¶
func NilOperatorID() OperatorID
func (OperatorID) String ¶
func (o OperatorID) String() string
type OperatorOption ¶
type OperatorOption func(*OperatorConfig)
func WithAutoStart ¶
func WithAutoStart(auto bool) OperatorOption
func WithInput ¶
func WithInput(inputs ...InputConfig) OperatorOption
func WithOutput ¶
func WithOutput(topics ...pubsub.StreamID) OperatorOption
type PipelineOperatorEngine ¶
func (*PipelineOperatorEngine) ID ¶
func (o *PipelineOperatorEngine) ID() OperatorID
func (*PipelineOperatorEngine) InStream ¶
func (o *PipelineOperatorEngine) InStream(in pubsub.StreamID, p events.PolicyDescription)
func (*PipelineOperatorEngine[TIN, TOUT]) Process ¶
func (o *PipelineOperatorEngine[TIN, TOUT]) Process(in ...events.Event[TIN])
func (*PipelineOperatorEngine[TIN, TOUT]) Start ¶
func (o *PipelineOperatorEngine[TIN, TOUT]) Start() error
type QueryOption ¶
type QueryOption func(*queryOptions)
func WithNewRepository ¶
func WithNewRepository() QueryOption
func WithRepository ¶
func WithRepository(r *pubsub.StreamRepository) QueryOption
type Repository ¶
type Repository interface {
fmt.Stringer
Get(id ID) (ContinuousQuery, bool)
List() map[ID]ContinuousQuery
// contains filtered or unexported methods
}
Repository manages the storage and retrieval of ContinuousQueries.
func QueryRepository ¶
func QueryRepository() Repository
QueryRepository returns the singleton instance of the query repository.
type StreamCreationOptions ¶
type StreamCreationOptions struct {
// contains filtered or unexported fields
}
func Source ¶
func Source[T any](topic string, opts ...pubsub.StreamOption) StreamCreationOptions
Source creates a StreamDef for a source stream of type T.
type StreamWError
deprecated
type StreamWError struct {
// contains filtered or unexported fields
}
Deprecated: Use NewBuilder() instead.
func OnStream
deprecated
func OnStream[T any](stream StreamWError) StreamWError
Deprecated: Use NewBuilder() instead.
type TypedContinuousQuery ¶
type TypedContinuousQuery[T any] struct { // contains filtered or unexported fields }
TypedContinuousQuery is a typed wrapper around ContinuousQuery that provides a typed output receiver.
func (*TypedContinuousQuery[T]) ID ¶
func (c *TypedContinuousQuery[T]) ID() ID
ID returns the unique identifier of the query.
func (*TypedContinuousQuery[T]) Run ¶
func (c *TypedContinuousQuery[T]) Run() error
func (*TypedContinuousQuery[T]) Subscribe ¶
func (c *TypedContinuousQuery[T]) Subscribe( callback any, options ...pubsub.SubscriberOption, ) error
type TypedOperatorExecutor ¶
type TypedOperatorExecutor[TIn any] interface { OperatorEngine Process(event ...events.Event[TIn]) }