Documentation
¶
Overview ¶
Package component defines the Discoverable interface and related types
Package component provides the core component infrastructure for SemStreams, enabling dynamic component discovery, registration, lifecycle management, and instance creation.
Overview ¶
The component package defines fundamental abstractions for all SemStreams components, supporting four component types: inputs (data sources), processors (data transformers), outputs (data sinks), and storage (persistence). Components are self-describing units that can be discovered at runtime, configured through schemas, and managed through their lifecycle.
The Registry serves as the central component management system, handling both factory registration and instance management with thread-safe operations and proper lifecycle control.
Component Registration Pattern ¶
SemStreams uses EXPLICIT registration rather than init() self-registration. This provides:
- Testability: Can create isolated registries for testing
- Explicitness: Clear component dependency graph
- Control: Main application controls what gets registered
- No side effects: No global state modification during package initialization
Registration Flow:
- Each component package exports a Register(*Registry) error function
- componentregistry.RegisterAll() orchestrates all registrations
- main.go explicitly calls RegisterAll() with a created Registry
- Components are now available for instantiation
Example component registration:
// In pkg/input/udp.go
func Register(registry *component.Registry) error {
return registry.RegisterWithConfig(component.RegistrationConfig{
Name: "udp",
Factory: CreateUDPInput,
Schema: udpSchema,
Type: "input",
Protocol: "udp",
Domain: "network",
Description: "UDP input component for network data",
Version: "1.0.0",
})
}
// In pkg/componentregistry/register.go
func RegisterAll(registry *component.Registry) error {
if err := input.Register(registry); err != nil {
return err
}
if err := robotics.Register(registry); err != nil {
return err
}
// ... more registrations
return nil
}
// In cmd/semstreams/main.go
registry := component.NewRegistry()
if err := componentregistry.RegisterAll(registry); err != nil {
log.Fatal(err)
}
Quick Start ¶
Creating and using a component:
// Create component registry and register all components
registry := component.NewRegistry()
if err := componentregistry.RegisterAll(registry); err != nil {
return err
}
// Create component configuration
config := types.ComponentConfig{
Type: types.ComponentTypeInput,
Name: "udp",
Enabled: true,
Config: json.RawMessage(`{"port": 8080, "bind": "0.0.0.0"}`),
}
// Prepare component dependencies
deps := component.Dependencies{
NATSClient: natsClient,
Platform: component.PlatformMeta{
Org: "c360",
Platform: "platform1",
},
Logger: slog.Default(),
}
// Create component instance
instance, err := registry.CreateComponent("udp-input-1", config, deps)
if err != nil {
return err
}
// Component is now ready to use
meta := instance.Meta()
health := instance.Health()
core Concepts ¶
Discoverable Interface:
Every component must implement Discoverable, providing metadata, port definitions, configuration schema, health status, and data flow metrics. This enables runtime introspection and management.
Registry Pattern:
The Registry manages component factories and instances with thread-safe operations. Components register explicitly via Register() functions called by componentregistry, and the Registry handles creation and lifecycle management.
Dependencies:
All external dependencies (NATS client, object store, metrics, logger, platform identity) are injected through Dependencies struct, following clean dependency injection patterns.
Port Types:
Components declare their inputs and outputs using strongly-typed ports that implement the Portable interface:
- NATSPort: core pub/sub messaging on NATS subjects
- JetStreamPort: Durable streaming with JetStream for reliable delivery
- KVWatchPort: Watch KV bucket changes for real-time state observation
- KVWritePort: Declare writes to KV buckets for flow validation
- NATSRequestPort: Request/reply pattern with timeouts
- NetworkPort: TCP/UDP network bindings for external connectivity
Example port configuration:
func (c *MyComponent) OutputPorts() []component.Port {
return []component.Port{
{
Name: "data_stream",
Direction: component.DirectionOutput,
Required: true,
Config: component.NATSPort{Subject: "data.output"},
},
{
Name: "entity_states",
Direction: component.DirectionOutput,
Required: false,
Config: component.KVWritePort{
Bucket: "ENTITY_STATES",
Interface: &component.InterfaceContract{
Type: "graph.EntityState",
Version: "v1",
},
},
},
}
}
Configuration Schema ¶
Components define their configuration through ConfigSchema, enabling:
- Schema-driven UI generation with type-specific form inputs
- Client and server-side validation before config persistence
- Property categorization (basic vs advanced) for progressive disclosure
- Default value population for improved user experience
Schema Definition Example:
func (u *UDPInput) ConfigSchema() component.ConfigSchema {
return component.ConfigSchema{
Properties: map[string]component.PropertySchema{
"port": {
Type: "int",
Description: "UDP port to listen on",
Default: 14550,
Minimum: ptrInt(1),
Maximum: ptrInt(65535),
Category: "basic", // Shown by default in UI
},
"bind_address": {
Type: "string",
Description: "IP address to bind to (0.0.0.0 for all interfaces)",
Default: "0.0.0.0",
Category: "basic",
},
"buffer_size": {
Type: "int",
Description: "UDP receive buffer size in bytes",
Default: 8192,
Minimum: ptrInt(512),
Category: "advanced", // Hidden in collapsible section
},
},
Required: []string{"port", "bind_address"},
}
}
Property Types:
- "string": Text input, optional pattern validation
- "int": Number input with min/max constraints
- "bool": Checkbox input
- "float": Number input allowing decimals
- "enum": Dropdown select with predefined values
- "object": Complex nested configuration (JSON editor fallback in MVP)
- "array": List of values (JSON editor fallback in MVP)
Schema Validation:
Configurations are validated both client-side (instant feedback) and server-side (before persistence) using the ValidateConfig() function:
config := map[string]any{
"port": 99999, // Exceeds maximum
}
errors := component.ValidateConfig(config, schema)
if len(errors) > 0 {
// Returns: [{Field: "port", Message: "port must be <= 65535", Code: "max"}]
// Frontend displays error next to the port field
}
Graceful Degradation:
Components without schemas still work - the UI falls back to a JSON editor. The system logs warnings when schemas are missing but continues operating:
// Component without schema - still functional
func (c *MyComponent) ConfigSchema() component.ConfigSchema {
return component.ConfigSchema{} // Empty schema
}
// UI will show: "Schema not available, using JSON editor"
Property Categorization:
The Category field organizes properties for progressive disclosure:
- "basic": Common settings shown by default (port, host, enabled)
- "advanced": Expert settings in collapsible section (buffer sizes, timeouts)
- Empty/unset: Defaults to "advanced"
UI renders basic properties first, then advanced in a collapsible <details> element. Properties within each category are sorted alphabetically for consistency.
Helper Functions:
- GetProperties(schema, category): Filter properties by category
- SortedPropertyNames(schema): Get property names in UI display order
- IsComplexType(propType): Identify object/array types needing special handling
- ValidateConfig(config, schema): Validate configuration against schema
Discoverable Interface ¶
All components must implement the Discoverable interface:
type Discoverable interface {
Meta() Metadata // Component metadata (name, type, version)
InputPorts() []Port // Input port definitions
OutputPorts() []Port // Output port definitions
ConfigSchema() ConfigSchema // Configuration schema for validation
Health() HealthStatus // Current health status
DataFlow() FlowMetrics // Data flow metrics (messages, bytes)
}
This interface enables:
- Runtime introspection of component capabilities
- Dynamic configuration validation
- Health monitoring and metrics collection
- Data flow visualization and debugging
Dependencies ¶
Dependencies are injected through a structured dependencies object:
type Dependencies struct {
NATSClient *natsclient.Client // Required: messaging
ObjectStore ObjectStore // Optional: persistence
MetricsRegistry *metric.MetricsRegistry // Optional: Prometheus metrics
Logger *slog.Logger // Optional: structured logging
Platform PlatformMeta // Required: platform identity
}
Benefits:
- Clean dependency injection
- Easy testing with mock dependencies
- Avoids parameter proliferation in factory functions
- Follows service architecture patterns
Factory Pattern ¶
Component factories follow a consistent signature:
type Factory func(rawConfig json.RawMessage, deps Dependencies) (Discoverable, error)
Example factory implementation:
func CreateUDPInput(rawConfig json.RawMessage, deps Dependencies) (component.Discoverable, error) {
// Parse component-specific configuration
var config UDPConfig
if err := json.Unmarshal(rawConfig, &config); err != nil {
return nil, fmt.Errorf("parse UDP config: %w", err)
}
// Validate configuration
if err := config.Validate(); err != nil {
return nil, fmt.Errorf("invalid UDP config: %w", err)
}
// Create component with dependencies
return &UDPInput{
config: config,
natsClient: deps.NATSClient,
logger: deps.Logger,
platform: deps.Platform,
}, nil
}
Factories:
- Receive raw JSON configuration and parse it themselves
- Validate configuration before creating instances
- Return initialized components ready to use
- Follow service constructor patterns for consistency
Registry Thread Safety ¶
All Registry operations are thread-safe:
- Factory registration uses write locks
- Component creation uses read locks for factory lookup
- Instance tracking uses write locks
- Listing operations use read locks
Concurrency characteristics:
- Multiple goroutines can create components concurrently
- Factory registration blocks component creation temporarily
- ListAvailable() is safe to call during component creation
- No deadlocks due to ordered lock acquisition
Error Handling ¶
The package defines specific error types for different failure modes:
ErrFactoryAlreadyExists // Attempted to register duplicate factory ErrInvalidFactory // Invalid factory registration ErrFactoryNotFound // Attempted to create from unknown factory ErrComponentCreation // Factory returned error during creation ErrInstanceExists // Instance name already in use ErrInstanceNotFound // Attempted to access unknown instance
Error checking:
_, err := registry.CreateComponent("instance-1", config, deps)
if errors.Is(err, component.ErrFactoryNotFound) {
// Handle missing factory - configuration error
}
if errors.Is(err, component.ErrComponentCreation) {
// Handle factory failure - component-specific error
}
Testing ¶
The explicit registration pattern makes testing straightforward:
// Create isolated test registry
registry := component.NewRegistry()
// Register only components needed for test
if err := input.Register(registry); err != nil {
t.Fatal(err)
}
// Create test dependencies with mocks
deps := component.Dependencies{
NATSClient: natsclient.NewTestClient(t),
Platform: component.PlatformMeta{
Org: "test",
Platform: "test-platform",
},
Logger: slog.Default(),
}
// Test component creation
instance, err := registry.CreateComponent("test-1", config, deps)
if err != nil {
t.Fatal(err)
}
// Verify component behavior through Discoverable interface
assert.Equal(t, "udp", instance.Meta().Type)
assert.True(t, instance.Health().Healthy)
Testing patterns:
- Use real NATS client via natsclient.NewTestClient() for integration tests
- Create isolated registries per test to avoid global state
- Mock external dependencies that cannot be containerized
- Test component behavior through Discoverable interface methods
- Verify factory registration and component creation separately
Performance Considerations ¶
Registry Performance:
- Factory lookup: O(1) with map-based storage
- Component creation: Factory execution time + O(1) registry overhead
- Memory: Components maintain references in Registry until unregistered
- Concurrency: Read-write mutex allows concurrent component creation
Component Lifecycle:
- Components are created on-demand, not pre-instantiated
- Registry holds strong references to created instances
- Memory is released when components are unregistered
- No automatic garbage collection of unused components
Architecture Decisions ¶
Explicit Registration vs init() Self-Registration:
Decision: Use explicit Register() functions called by componentregistry
Benefits:
- Testability: Can create isolated registries without global state
- Explicitness: Clear component dependency graph in componentregistry
- Control: Main application controls what gets registered and when
- No side effects: Package imports don't modify global state
- Deterministic: Registration order is explicit and controllable
Tradeoffs:
- Requires componentregistry orchestration package
- Registration must be explicitly called in main()
- New components must update componentregistry.RegisterAll()
Registry-Based Architecture vs Distributed Catalog:
Decision: Use centralized Registry for component management
Benefits:
- Simpler to reason about and test
- Single source of truth for component management
- Thread-safe operations with minimal overhead
- No network dependencies for component discovery
Dependency Injection via Struct:
Decision: Use Dependencies struct instead of individual parameters
Benefits:
- Avoids parameter proliferation in factory functions
- Easy to add new dependencies without breaking existing factories
- Enables easy testing with mock dependencies
- Follows service architecture patterns
Factory Pattern for Component Creation:
Decision: Service-like constructors that parse their own configuration
Benefits:
- Components handle their own configuration parsing
- Enables flexible configuration validation per component
- Matches service constructor signatures for consistency
- Centralizes configuration knowledge in component packages
Integration Points ¶
Dependencies:
- pkg/natsclient: Required for NATS messaging
- pkg/storage/objectstore: Optional for persistence
- pkg/metric: Optional for Prometheus metrics
- log/slog: Optional for structured logging (defaults to slog.Default())
Used By:
- pkg/service: Manager uses Registry for component lifecycle
- pkg/componentregistry: Orchestrates component registration
- cmd/semstreams: Application entry point creates and populates Registry
Data Flow:
Configuration → Factory Lookup → Factory Execution → Component Instance → Registry
Examples ¶
See component_test.go and registry_test.go for comprehensive examples including:
- Component registration and creation
- Factory validation
- Instance management
- Error handling patterns
- Testing with isolated registries
Package component provides base types and utilities for SemStreams components.
Package component provides port configuration and management for component connections.
Package component provides schema validation and helper functions ¶
Package component provides schema tag parsing and generation for component configuration.
The schema tag system eliminates duplication between Config structs and ConfigSchema definitions by auto-generating schemas from struct tags. This provides a single source of truth for configuration metadata and follows Go stdlib patterns (similar to json tags).
Basic Usage ¶
Define configuration with schema tags:
type MyConfig struct {
Name string `json:"name" schema:"type:string,description:Component name,category:basic"`
Port int `json:"port" schema:"type:int,description:Port,min:1,max:65535,default:8080"`
}
Generate schema at init time:
var schema = component.GenerateConfigSchema(reflect.TypeOf(MyConfig{}))
Tag Syntax ¶
Tags use comma-separated directives with colon-separated key-value pairs:
- type:string - Field data type (required)
- description:text - Field description (recommended)
- category:basic - UI organization (basic or advanced)
- default:value - Default value
- min:N, max:N - Numeric constraints
- enum:a|b|c - Valid enum values (pipe-separated)
- readonly, editable - Boolean flags for PortDefinition fields
- required, hidden - Boolean flags for validation and UI
Performance ¶
Schema generation uses reflection but is designed for init-time execution:
- Call GenerateConfigSchema once at package init
- Cache result in package-level variable
- Zero reflection cost at runtime
Error Handling ¶
Invalid tags result in graceful degradation:
- Fields with invalid tags are skipped
- Errors are wrapped with context using pkg/errors
- Missing descriptions use field names as fallback
See docs/architecture/SCHEMA_TAG_SPEC.md for complete specification.
Index ¶
- Constants
- func BenchmarkLifecycleMethods(b *testing.B, factory LifecycleFactory)
- func BuildPayload(domain, category, version string, fields map[string]any) (any, error)
- func CreatePayload(domain, category, version string) any
- func GenerateCacheFieldSchema() map[string]CacheFieldInfo
- func GeneratePortFieldSchema() map[string]PortFieldInfo
- func GetBool(config map[string]any, key string, defaultValue bool) bool
- func GetFloat64(config map[string]any, key string, defaultValue float64) float64
- func GetInt(config map[string]any, key string, defaultValue int) int
- func GetProperties(schema ConfigSchema, category string) map[string]PropertySchema
- func GetPropertyValue(config map[string]any, key string) (any, bool)
- func GetString(config map[string]any, key string, defaultValue string) string
- func IsComplexType(propType string) bool
- func IsLifecycleComponent(comp Discoverable) bool
- func RegisterPayload(registration *PayloadRegistration) error
- func SafeUnmarshal(rawConfig json.RawMessage, target any) error
- func SortedPropertyNames(schema ConfigSchema) []string
- func StandardLifecycleTests(t *testing.T, factory LifecycleFactory)
- func TestErrorInjection(t *testing.T, factory LifecycleFactory)
- func ValidateComponentName(name string) error
- func ValidateConfigKey(key string) error
- func ValidateFactoryConfig(rawConfig json.RawMessage) error
- func ValidateJSONSize(data json.RawMessage) error
- func ValidateNetworkConfig(port int, bindAddr string) error
- func ValidatePortNumber(port int) error
- func WithLogger(logger *slog.Logger) func(*Registry)
- type CacheFieldInfo
- type CapabilityAnnouncement
- type ConfigSchema
- type ConfigValidator
- type ConsumerConfig
- type DebugStatusProvider
- type Dependencies
- type Direction
- type Discoverable
- type ErrorInjectingComponent
- func (e *ErrorInjectingComponent) Initialize() error
- func (e *ErrorInjectingComponent) InjectInitializeError(err error)
- func (e *ErrorInjectingComponent) InjectStartError(err error)
- func (e *ErrorInjectingComponent) InjectStopError(err error)
- func (e *ErrorInjectingComponent) Start(ctx context.Context) error
- func (e *ErrorInjectingComponent) Stop(timeout time.Duration) error
- type Factory
- type FilePort
- type FlowMetrics
- type HealthStatus
- type Info
- type InterfaceContract
- type JetStreamPort
- type KVLifecycleReporter
- func (r *KVLifecycleReporter) ReportCycleComplete(ctx context.Context) error
- func (r *KVLifecycleReporter) ReportCycleError(ctx context.Context, err error) error
- func (r *KVLifecycleReporter) ReportCycleStart(ctx context.Context) error
- func (r *KVLifecycleReporter) ReportStage(ctx context.Context, stage string) error
- type KVWatchPort
- type KVWritePort
- type LifecycleComponent
- type LifecycleFactory
- type LifecycleReporter
- type LifecycleReporterConfig
- type LogEntry
- type LogLevel
- type ManagedComponent
- type Metadata
- type NATSPort
- type NATSRequestPort
- type NATSRequestPortConfig
- type NATSStreamPortConfig
- type NetworkPort
- type NoOpLifecycleReporter
- func (r *NoOpLifecycleReporter) ReportCycleComplete(_ context.Context) error
- func (r *NoOpLifecycleReporter) ReportCycleError(_ context.Context, _ error) error
- func (r *NoOpLifecycleReporter) ReportCycleStart(_ context.Context) error
- func (r *NoOpLifecycleReporter) ReportStage(_ context.Context, _ string) error
- type PayloadBuilder
- type PayloadFactory
- type PayloadRegistration
- type PayloadRegistry
- func (pr *PayloadRegistry) BuildPayload(domain, category, version string, fields map[string]any) (any, error)
- func (pr *PayloadRegistry) CreatePayload(domain, category, version string) any
- func (pr *PayloadRegistry) GetRegistration(msgType string) (*PayloadRegistration, bool)
- func (pr *PayloadRegistry) ListByDomain(domain string) []*PayloadRegistration
- func (pr *PayloadRegistry) ListPayloads() map[string]*PayloadRegistration
- func (pr *PayloadRegistry) RegisterPayload(registration *PayloadRegistration) error
- type PlatformMeta
- type Port
- type PortCapability
- type PortConfig
- type PortDefinition
- type PortFieldInfo
- type Portable
- type ProcessorMetrics
- type PropertySchema
- type Registerable
- type Registration
- type RegistrationConfig
- type Registry
- func (r *Registry) Component(name string) Discoverable
- func (r *Registry) CreateComponent(instanceName string, config types.ComponentConfig, deps Dependencies) (Discoverable, error)
- func (r *Registry) CreatePayload(domain, category, version string) any
- func (r *Registry) GetCapabilities(subjectPattern string) []*CapabilityAnnouncement
- func (r *Registry) GetComponent(name string) (Discoverable, error)
- func (r *Registry) GetComponentSchema(name string) (ConfigSchema, error)
- func (r *Registry) GetFactory(name string) (Factory, bool)
- func (r *Registry) InitNATS(ctx context.Context, client *natsclient.Client, nodeID string) error
- func (r *Registry) ListAvailable() map[string]Info
- func (r *Registry) ListComponentTypes() []string
- func (r *Registry) ListComponents() map[string]Discoverable
- func (r *Registry) ListFactories() map[string]*Registration
- func (r *Registry) ListPayloads() map[string]*PayloadRegistration
- func (r *Registry) RegisterFactory(name string, registration *Registration) error
- func (r *Registry) RegisterInstance(name string, component Discoverable) error
- func (r *Registry) RegisterPayload(registration *PayloadRegistration) error
- func (r *Registry) RegisterWithConfig(config RegistrationConfig) error
- func (r *Registry) StartHeartbeat(ctx context.Context, interval time.Duration)
- func (r *Registry) StopHeartbeat()
- func (r *Registry) SubscribeCapabilities(ctx context.Context, patterns ...string) error
- func (r *Registry) UnregisterInstance(name string)
- func (r *Registry) WaitForCapabilities(ctx context.Context, pattern string, minCount int, timeout time.Duration) error
- type SchemaDirectives
- type SimpleMockComponent
- func (m *SimpleMockComponent) ConfigSchema() ConfigSchema
- func (m *SimpleMockComponent) DataFlow() FlowMetrics
- func (m *SimpleMockComponent) Health() HealthStatus
- func (m *SimpleMockComponent) InputPorts() []Port
- func (m *SimpleMockComponent) Meta() Metadata
- func (m *SimpleMockComponent) OutputPorts() []Port
- type State
- type Status
- type ThrottledLifecycleReporter
- func (r *ThrottledLifecycleReporter) FlushPending(ctx context.Context) error
- func (r *ThrottledLifecycleReporter) ReportCycleComplete(ctx context.Context) error
- func (r *ThrottledLifecycleReporter) ReportCycleError(ctx context.Context, err error) error
- func (r *ThrottledLifecycleReporter) ReportCycleStart(ctx context.Context) error
- func (r *ThrottledLifecycleReporter) ReportStage(ctx context.Context, stage string) error
- type TimerPort
- type Validatable
- type ValidationError
Examples ¶
Constants ¶
const ( MaxStringLength = 1024 // Maximum length for string values MaxJSONSize = 1024 * 1024 // Maximum JSON size (1MB) MinPort = 1 // Minimum valid port number MaxPort = 65535 // Maximum valid port number MaxInt = math.MaxInt32 // Maximum safe integer value MinInt = math.MinInt32 // Minimum safe integer value )
Config validation constants - security limits
Variables ¶
This section is empty.
Functions ¶
func BenchmarkLifecycleMethods ¶
func BenchmarkLifecycleMethods(b *testing.B, factory LifecycleFactory)
BenchmarkLifecycleMethods provides benchmark tests for lifecycle operations
func BuildPayload ¶
BuildPayload creates a typed payload from field mappings using the globally registered builder. Returns an error if the payload type is not registered or if the builder fails. This is used by workflow variable interpolation to construct typed payloads from step output maps. Returns any to avoid import cycles - the actual payload implements message.Payload.
func CreatePayload ¶
CreatePayload creates a payload instance using the globally registered factory. Returns nil if no factory is registered for the given type.
func GenerateCacheFieldSchema ¶
func GenerateCacheFieldSchema() map[string]CacheFieldInfo
GenerateCacheFieldSchema generates metadata for cache.Config fields. This describes which fields in cache.Config are editable and their constraints, enabling the UI to render appropriate controls for cache configuration.
The function examines cache.Config struct tags to determine:
- Field types (for appropriate UI controls)
- Editability (whether users can modify the field)
- Enum values (for strategy field)
- Numeric constraints (for size limits)
All cache.Config fields are marked as "editable" to allow runtime configuration.
This metadata is included in ConfigSchema for fields with type "cache", allowing the frontend to correctly render cache configuration forms.
Returns:
- Map of field names to CacheFieldInfo with type, editability, and constraint metadata
func GeneratePortFieldSchema ¶
func GeneratePortFieldSchema() map[string]PortFieldInfo
GeneratePortFieldSchema generates metadata for PortDefinition fields. This describes which fields in PortDefinition are editable vs read-only, enabling the UI to render appropriate controls for port configuration.
The function examines PortDefinition struct tags to determine:
- Field types (for appropriate UI controls)
- Editability (whether users can modify the field)
Fields marked with "editable" tag are user-modifiable (e.g., Subject, Timeout). Fields marked with "readonly" tag are display-only (e.g., Name, Type). Fields without schema tags default to read-only string type.
This metadata is included in ConfigSchema for fields with type "ports", allowing the frontend to correctly render port configuration forms.
Returns:
- Map of field names to PortFieldInfo with type and editability metadata
func GetBool ¶
GetBool safely extracts a boolean value from config with a default fallback and validation
func GetFloat64 ¶
GetFloat64 safely extracts a float64 value from config with a default fallback and validation
func GetInt ¶
GetInt safely extracts an integer value from config with a default fallback and bounds checking
func GetProperties ¶
func GetProperties(schema ConfigSchema, category string) map[string]PropertySchema
GetProperties filters schema properties by category for UI organization.
Components can categorize their configuration properties as "basic" (shown by default) or "advanced" (hidden in collapsible section). This function extracts properties belonging to a specific category.
Parameters:
- schema: The component's configuration schema
- category: Filter by "basic" or "advanced", or empty string for all properties
Properties without an explicit Category field default to "advanced".
Returns a map of property names to PropertySchema definitions matching the category.
Example:
schema := component.ConfigSchema{
Properties: map[string]component.PropertySchema{
"port": {Type: "int", Category: "basic"},
"buffer_size": {Type: "int", Category: "advanced"},
"timeout": {Type: "int"}, // Defaults to "advanced"
},
}
basicProps := component.GetProperties(schema, "basic")
// Returns: map["port": {...}]
advancedProps := component.GetProperties(schema, "advanced")
// Returns: map["buffer_size": {...}, "timeout": {...}]
func GetPropertyValue ¶
GetPropertyValue safely extracts a property value from a configuration map.
Returns the value and true if the key exists, or nil and false if the key is not present in the map. This function is nil-safe - passing a nil config will return (nil, false).
Example:
config := map[string]any{"port": 8080, "host": "localhost"}
if port, exists := component.GetPropertyValue(config, "port"); exists {
fmt.Printf("Port: %v\n", port)
}
func GetString ¶
GetString safely extracts a string value from config with a default fallback and validation
func IsComplexType ¶
IsComplexType returns true if a property type requires complex rendering.
Complex types (object, array) cannot be rendered as simple form inputs and require specialized UI components like JSON editors or nested form builders.
Currently identifies "object" and "array" types as complex. In MVP, the UI falls back to a JSON editor for these types.
Example:
if component.IsComplexType(propSchema.Type) {
// Use JSON editor fallback
renderJSONEditor(propSchema)
} else {
// Render type-specific input field
renderInputField(propSchema)
}
func IsLifecycleComponent ¶
func IsLifecycleComponent(comp Discoverable) bool
IsLifecycleComponent checks if a component supports lifecycle management
func RegisterPayload ¶
func RegisterPayload(registration *PayloadRegistration) error
RegisterPayload registers a payload factory globally. This allows typed payloads to be recreated during message deserialization. Payloads use init() registration as they are data types, not lifecycle components.
func SafeUnmarshal ¶
func SafeUnmarshal(rawConfig json.RawMessage, target any) error
SafeUnmarshal performs validated unmarshaling into a target struct It validates the JSON first, then unmarshals with additional type checking
func SortedPropertyNames ¶
func SortedPropertyNames(schema ConfigSchema) []string
SortedPropertyNames returns property names in UI display order.
Properties are sorted by: 1. Category: "basic" properties first, then "advanced" properties 2. Alphabetically within each category
This ensures consistent, predictable ordering in configuration UIs. Properties without an explicit Category default to "advanced".
Example:
schema := component.ConfigSchema{
Properties: map[string]component.PropertySchema{
"port": {Category: "basic"},
"bind_address": {Category: "basic"},
"buffer_size": {Category: "advanced"},
"timeout": {}, // Defaults to "advanced"
},
}
names := component.SortedPropertyNames(schema)
// Returns: ["bind_address", "port", "buffer_size", "timeout"]
// ^-- basic (alpha) --^ ^---- advanced (alpha) ----^
func StandardLifecycleTests ¶
func StandardLifecycleTests(t *testing.T, factory LifecycleFactory)
StandardLifecycleTests runs comprehensive lifecycle tests for any component that implements LifecycleComponent This ensures consistent testing standards across all components in the semstreams system
func TestErrorInjection ¶
func TestErrorInjection(t *testing.T, factory LifecycleFactory)
TestErrorInjection tests components with injected errors
func ValidateComponentName ¶
ValidateComponentName validates component/instance names for security
func ValidateConfigKey ¶
ValidateConfigKey checks if a configuration key is valid
func ValidateFactoryConfig ¶
func ValidateFactoryConfig(rawConfig json.RawMessage) error
ValidateFactoryConfig performs validation before passing to factory This is the main security gate for all component configurations
func ValidateJSONSize ¶
func ValidateJSONSize(data json.RawMessage) error
ValidateJSONSize checks if JSON input is within safe limits
func ValidateNetworkConfig ¶
ValidateNetworkConfig validates network configuration including port and bind address
func ValidatePortNumber ¶
ValidatePortNumber validates port numbers are within valid range
func WithLogger ¶
WithLogger sets a custom logger for the registry
Types ¶
type CacheFieldInfo ¶
type CacheFieldInfo struct {
Type string `json:"type"`
Editable bool `json:"editable"`
Enum []string `json:"enum,omitempty"` // For strategy field
Min *int `json:"min,omitempty"` // For numeric fields
}
CacheFieldInfo describes metadata for cache.Config fields
type CapabilityAnnouncement ¶
type CapabilityAnnouncement struct {
InstanceName string `json:"instance_name"`
Component string `json:"component"`
Type string `json:"type"`
Version string `json:"version"`
InputPorts []PortCapability `json:"input_ports,omitempty"`
OutputPorts []PortCapability `json:"output_ports,omitempty"`
Timestamp time.Time `json:"timestamp"`
TTL time.Duration `json:"ttl"`
NodeID string `json:"node_id"`
}
CapabilityAnnouncement is published to NATS when components register.
type ConfigSchema ¶
type ConfigSchema struct {
Properties map[string]PropertySchema `json:"properties"`
Required []string `json:"required"`
}
ConfigSchema describes the configuration parameters for a component
func GenerateConfigSchema ¶
func GenerateConfigSchema(configType reflect.Type) ConfigSchema
GenerateConfigSchema generates a ConfigSchema from a struct type using reflection. This function performs one-time reflection at initialization to extract schema metadata from struct field tags, eliminating the need for manual schema definitions.
Usage Pattern:
type MyComponentConfig struct {
Name string `json:"name" schema:"type:string,description:Name,category:basic"`
Port int `json:"port" schema:"type:int,description:Port,min:1,max:65535"`
}
var schema = component.GenerateConfigSchema(reflect.TypeOf(MyComponentConfig{}))
Field Processing:
- Only exported fields with both 'json' and 'schema' tags are included
- json:"-" fields are skipped
- Fields without schema tags are skipped
- Invalid schema tags result in skipped fields (graceful degradation)
Special Handling:
- Fields with type "ports" automatically include PortFieldSchema metadata
- Default values are converted from strings to appropriate types
- Required fields are added to the schema's Required list
Performance:
- Call once at init() time - reflection cost is paid only once
- Generated schemas are cached in package-level variables
- Zero reflection overhead at runtime
Parameters:
- configType: The reflect.Type of the config struct (use reflect.TypeOf(ConfigStruct{})) Pointer types are automatically dereferenced
Returns:
- ConfigSchema with Properties map and Required list populated from struct tags
- Empty schema for non-struct types
Example ¶
ExampleGenerateConfigSchema demonstrates how to use schema tags to auto-generate configuration schemas from struct definitions
package main
import (
"encoding/json"
"fmt"
"reflect"
"github.com/c360studio/semstreams/component"
)
func main() {
// Define a configuration struct with schema tags
type ComponentConfig struct {
// Basic configuration
Name string `json:"name" schema:"type:string,description:Component name,category:basic"`
Port int `json:"port" schema:"type:int,description:Listen port,min:1,max:65535,default:8080,category:basic"`
Enabled bool `json:"enabled" schema:"type:bool,description:Enable component,default:true,category:basic"`
// Advanced configuration
Timeout string `json:"timeout" schema:"type:string,description:Request timeout,default:30s,category:advanced"`
LogLevel string `json:"log_level" schema:"type:enum,description:Logging level,enum:debug|info|warn|error,default:info,category:advanced"`
// Required field
APIKey string `json:"api_key" schema:"required,type:string,description:Authentication API key"`
}
// Generate the schema at init time (one-time reflection cost)
schema := component.GenerateConfigSchema(reflect.TypeOf(ComponentConfig{}))
// The generated schema can be used for validation, UI generation, etc.
schemaJSON, _ := json.MarshalIndent(schema, "", " ")
fmt.Println(string(schemaJSON))
// Output will show the generated schema with all properties
}
type ConfigValidator ¶
type ConfigValidator struct {
// contains filtered or unexported fields
}
ConfigValidator provides secure validation for component configurations
func NewConfigValidator ¶
func NewConfigValidator() *ConfigValidator
NewConfigValidator creates a validator with secure defaults
func (*ConfigValidator) ValidateConfig ¶
func (v *ConfigValidator) ValidateConfig(rawConfig json.RawMessage) error
ValidateConfig performs comprehensive validation on raw JSON config This prevents injection attacks, resource exhaustion, and malformed input
type ConsumerConfig ¶
ConsumerConfig holds extracted JetStream consumer configuration.
func GetConsumerConfig ¶
func GetConsumerConfig(port Port) ConsumerConfig
GetConsumerConfig extracts JetStream consumer configuration from a port. Returns safe defaults if port doesn't have JetStream config: - DeliverPolicy: "new" (safe default - don't replay historical messages) - AckPolicy: "explicit" - MaxDeliver: 3
func GetConsumerConfigFromDefinition ¶
func GetConsumerConfigFromDefinition(portDef PortDefinition) ConsumerConfig
GetConsumerConfigFromDefinition extracts JetStream consumer configuration from a port definition. This is a convenience wrapper for use with PortDefinition instead of Port.
type DebugStatusProvider ¶
type DebugStatusProvider interface {
// DebugStatus returns extended debug information for the component.
// The returned value should be JSON-serializable.
DebugStatus() any
}
DebugStatusProvider is an optional interface for components that can provide extended debug information beyond basic health status.
type Dependencies ¶
type Dependencies struct {
NATSClient *natsclient.Client // NATS client for messaging
MetricsRegistry *metric.MetricsRegistry // Metrics registry for Prometheus (can be nil)
Logger *slog.Logger // Structured logger (can be nil, defaults to slog.Default())
Platform PlatformMeta // Platform identity (organization and platform)
Security security.Config // Platform-wide security configuration
ModelRegistry model.RegistryReader // Unified model registry (can be nil)
}
Dependencies provides all external dependencies needed by components. This structure follows the same pattern as Dependencies, enabling components to receive properly structured dependencies rather than individual fields.
func (*Dependencies) GetLogger ¶
func (d *Dependencies) GetLogger() *slog.Logger
GetLogger returns the configured logger or a default logger if none is provided
func (*Dependencies) GetLoggerWithComponent ¶
func (d *Dependencies) GetLoggerWithComponent(componentName string) *slog.Logger
GetLoggerWithComponent returns a logger configured with component context
type Discoverable ¶
type Discoverable interface {
// Meta returns basic component information
Meta() Metadata
// InputPorts returns the ports this component accepts data on
InputPorts() []Port
// OutputPorts returns the ports this component produces data on
OutputPorts() []Port
// ConfigSchema returns the configuration schema for this component
ConfigSchema() ConfigSchema
// Health returns current health status
Health() HealthStatus
// DataFlow returns current data flow metrics
DataFlow() FlowMetrics
}
Discoverable defines the interface for components that can be discovered and inspected by the management layer. This interface enables dynamic discovery of component capabilities, configuration, and health status.
Components implementing this interface can be: - Input components: Accept external data (UDP, TCP, HTTP) - Processor components: Transform data (plugins) - Output components: Send data to external systems - Storage components: Store and retrieve data (ObjectStore, KV)
type ErrorInjectingComponent ¶
type ErrorInjectingComponent struct {
LifecycleComponent
// contains filtered or unexported fields
}
ErrorInjectingComponent wraps a component to inject errors for testing
func NewErrorInjectingComponent ¶
func NewErrorInjectingComponent(comp LifecycleComponent) *ErrorInjectingComponent
NewErrorInjectingComponent creates a component wrapper that can inject errors for testing
func (*ErrorInjectingComponent) Initialize ¶
func (e *ErrorInjectingComponent) Initialize() error
Initialize initializes the component, returning injected error if configured
func (*ErrorInjectingComponent) InjectInitializeError ¶
func (e *ErrorInjectingComponent) InjectInitializeError(err error)
InjectInitializeError configures the component to return an error on Initialize
func (*ErrorInjectingComponent) InjectStartError ¶
func (e *ErrorInjectingComponent) InjectStartError(err error)
InjectStartError configures the component to return an error on Start
func (*ErrorInjectingComponent) InjectStopError ¶
func (e *ErrorInjectingComponent) InjectStopError(err error)
InjectStopError configures the component to return an error on Stop
type Factory ¶
type Factory func(rawConfig json.RawMessage, deps Dependencies) (Discoverable, error)
Factory creates a component instance from configuration following service pattern The factory function receives raw JSON configuration and dependencies, parses its own config, and returns a properly initialized component that implements the Discoverable interface. All I/O operations should be performed in the component's Start() method, not in the factory. This pattern matches service constructors: func(rawConfig json.RawMessage, deps Dependencies) (Service, error)
type FilePort ¶
FilePort - File system access
func (FilePort) IsExclusive ¶
IsExclusive returns false as multiple components can read files
func (FilePort) ResourceID ¶
ResourceID returns unique identifier for file ports
type FlowMetrics ¶
type FlowMetrics struct {
MessagesPerSecond float64 `json:"messages_per_second"`
BytesPerSecond float64 `json:"bytes_per_second"`
ErrorRate float64 `json:"error_rate"`
LastActivity time.Time `json:"last_activity"`
}
FlowMetrics describes the current data flow through a component
type HealthStatus ¶
type HealthStatus struct {
Healthy bool `json:"healthy"`
LastCheck time.Time `json:"last_check"`
ErrorCount int `json:"error_count"`
LastError string `json:"last_error,omitempty"`
Uptime time.Duration `json:"uptime"`
Status string `json:"status"`
}
HealthStatus describes the current health state of a component
type Info ¶
type Info struct {
Type string `json:"type"` // "input", "processor", "output", "storage"
Protocol string `json:"protocol"` // Technical protocol (udp, tcp, mavlink, etc.)
Domain string `json:"domain"` // Business domain (robotics, semantic, network, storage)
Description string `json:"description"` // Human-readable description
Version string `json:"version"` // Component version
}
Info holds metadata about an available component type
type InterfaceContract ¶
type InterfaceContract struct {
Type string `json:"type"` // e.g., "message.Storable"
Version string `json:"version,omitempty"` // e.g., "v1"
Compatible []string `json:"compatible,omitempty"` // Also accepts these
}
InterfaceContract defines expected message interface
type JetStreamPort ¶
type JetStreamPort struct {
// Stream configuration (for outputs)
StreamName string `json:"stream_name"` // e.g., "ENTITY_EVENTS"
Subjects []string `json:"subjects"` // e.g., ["events.graph.entity.>"]
Storage string `json:"storage,omitempty"` // "file" or "memory", default "file"
RetentionPolicy string `json:"retention,omitempty"` // "limits", "interest", "work_queue", default "limits"
RetentionDays int `json:"retention_days,omitempty"` // Message retention in days, default 7
MaxSizeGB int `json:"max_size_gb,omitempty"` // Max stream size in GB, default 10
Replicas int `json:"replicas,omitempty"` // Number of replicas, default 1
// Consumer configuration (for inputs)
ConsumerName string `json:"consumer_name,omitempty"` // Durable consumer name
DeliverPolicy string `json:"deliver_policy,omitempty"` // "all", "last", "new", default "new"
AckPolicy string `json:"ack_policy,omitempty"` // "explicit", "none", "all", default "explicit"
MaxDeliver int `json:"max_deliver,omitempty"` // Max redelivery attempts, default 3
// Interface contract
Interface *InterfaceContract `json:"interface,omitempty"`
}
JetStreamPort - NATS JetStream for durable, at-least-once messaging
func (JetStreamPort) IsExclusive ¶
func (j JetStreamPort) IsExclusive() bool
IsExclusive returns false as JetStream manages consumer coordination
func (JetStreamPort) ResourceID ¶
func (j JetStreamPort) ResourceID() string
ResourceID returns unique identifier for JetStream ports
func (JetStreamPort) Type ¶
func (j JetStreamPort) Type() string
Type returns the port type identifier
type KVLifecycleReporter ¶
type KVLifecycleReporter struct {
// contains filtered or unexported fields
}
KVLifecycleReporter implements LifecycleReporter using NATS KV storage. It writes component status to the COMPONENT_STATUS bucket.
func NewKVLifecycleReporter ¶
func NewKVLifecycleReporter(kv jetstream.KeyValue, componentName string, logger *slog.Logger) *KVLifecycleReporter
NewKVLifecycleReporter creates a new lifecycle reporter that writes to NATS KV.
func (*KVLifecycleReporter) ReportCycleComplete ¶
func (r *KVLifecycleReporter) ReportCycleComplete(ctx context.Context) error
ReportCycleComplete marks successful cycle completion.
func (*KVLifecycleReporter) ReportCycleError ¶
func (r *KVLifecycleReporter) ReportCycleError(ctx context.Context, err error) error
ReportCycleError marks cycle failure with error details.
func (*KVLifecycleReporter) ReportCycleStart ¶
func (r *KVLifecycleReporter) ReportCycleStart(ctx context.Context) error
ReportCycleStart marks the beginning of a new processing cycle.
func (*KVLifecycleReporter) ReportStage ¶
func (r *KVLifecycleReporter) ReportStage(ctx context.Context, stage string) error
ReportStage updates the component's current processing stage.
type KVWatchPort ¶
type KVWatchPort struct {
Bucket string `json:"bucket"` // e.g., "ENTITY_STATES"
Keys []string `json:"keys,omitempty"` // Keys to watch, empty = all
History bool `json:"history,omitempty"` // Include historical values
Interface *InterfaceContract `json:"interface,omitempty"`
}
KVWatchPort - NATS KV Watch for state observation
func (KVWatchPort) IsExclusive ¶
func (k KVWatchPort) IsExclusive() bool
IsExclusive returns false as multiple watchers are allowed
func (KVWatchPort) ResourceID ¶
func (k KVWatchPort) ResourceID() string
ResourceID returns unique identifier for KV watch ports
type KVWritePort ¶
type KVWritePort struct {
Bucket string `json:"bucket"` // e.g., "ENTITY_STATES"
Interface *InterfaceContract `json:"interface,omitempty"` // Data type contract
}
KVWritePort - NATS KV Write for state persistence
func (KVWritePort) IsExclusive ¶
func (k KVWritePort) IsExclusive() bool
IsExclusive returns false as multiple writers are allowed (with CAS handling)
func (KVWritePort) ResourceID ¶
func (k KVWritePort) ResourceID() string
ResourceID returns unique identifier for KV write ports
type LifecycleComponent ¶
type LifecycleComponent interface {
Discoverable
Initialize() error
Start(ctx context.Context) error
Stop(timeout time.Duration) error
}
LifecycleComponent defines components that support full lifecycle management following the unified Pattern A:
- Initialize() error // Setup/create only, NO context
- Start(ctx context.Context) error // Start with context passed through
- Stop(timeout time.Duration) error // Stop with timeout for graceful shutdown
func AsLifecycleComponent ¶
func AsLifecycleComponent(comp Discoverable) (LifecycleComponent, bool)
AsLifecycleComponent safely casts a component to LifecycleComponent
type LifecycleFactory ¶
type LifecycleFactory func() LifecycleComponent
LifecycleFactory creates a new instance of a LifecycleComponent for testing
type LifecycleReporter ¶
type LifecycleReporter interface {
// ReportStage updates the component's current processing stage
ReportStage(ctx context.Context, stage string) error
// ReportCycleStart marks the beginning of a new processing cycle
ReportCycleStart(ctx context.Context) error
// ReportCycleComplete marks successful cycle completion
ReportCycleComplete(ctx context.Context) error
// ReportCycleError marks cycle failure with error details
ReportCycleError(ctx context.Context, err error) error
}
LifecycleReporter allows components to report their current processing stage. This enables observability for long-running async components (ADR-003).
func NewLifecycleReporterFromConfig ¶
func NewLifecycleReporterFromConfig(cfg LifecycleReporterConfig) LifecycleReporter
NewLifecycleReporterFromConfig creates a lifecycle reporter based on configuration. Returns either a throttled KV reporter, plain KV reporter, or no-op reporter.
type LifecycleReporterConfig ¶
type LifecycleReporterConfig struct {
// KV is the NATS KV bucket for status storage. Required for KV reporter.
KV jetstream.KeyValue
// ComponentName is the name of the component. Required.
ComponentName string
// Logger for the reporter. Optional.
Logger *slog.Logger
// EnableThrottling enables throttled writes. Default: true.
EnableThrottling bool
// ThrottleInterval is the minimum interval between writes.
// Only used if EnableThrottling is true. Default: 1 second.
ThrottleInterval time.Duration
// Disabled returns a no-op reporter if true.
Disabled bool
}
LifecycleReporterConfig holds configuration for creating lifecycle reporters.
func DefaultLifecycleReporterConfig ¶
func DefaultLifecycleReporterConfig(componentName string) LifecycleReporterConfig
DefaultLifecycleReporterConfig returns configuration with sensible defaults. EnableThrottling is true with 1 second interval.
type LogEntry ¶
type LogEntry struct {
Timestamp string `json:"timestamp"` // RFC3339 format
Level LogLevel `json:"level"`
Component string `json:"component"`
FlowID string `json:"flow_id"`
Message string `json:"message"`
Stack string `json:"stack,omitempty"` // Stack trace for errors
}
LogEntry represents a structured log entry that can be published to NATS and consumed by the Flow Builder SSE endpoint.
type LogLevel ¶
type LogLevel string
LogLevel represents the severity level of a log entry
const ( // LogLevelDebug represents debug-level logs LogLevelDebug LogLevel = "DEBUG" // LogLevelInfo represents informational logs LogLevelInfo LogLevel = "INFO" // LogLevelWarn represents warning logs LogLevelWarn LogLevel = "WARN" // LogLevelError represents error logs LogLevelError LogLevel = "ERROR" )
type ManagedComponent ¶
type ManagedComponent struct {
// Component is the actual component instance
Component Discoverable
// State tracks the current lifecycle state
State State
// Named Context Management for Individual Component Lifecycle Control
//
// These fields store named child contexts to enable individual component cancellation
// during shutdown. This follows the pattern where ComponentManager creates a child
// context for each component and passes it to lifecycle.Start(ctx).
//
// The component itself NEVER stores the context - it receives it as a parameter
// following proper Go idioms. Only the ComponentManager stores these contexts
// to coordinate orderly shutdown and individual component cancellation.
//
// Pattern:
// 1. ComponentManager creates: ctx, cancel := context.WithCancel(parentCtx)
// 2. ComponentManager stores: mc.Context = ctx, mc.Cancel = cancel
// 3. ComponentManager calls: lifecycle.Start(mc.Context)
// 4. Component uses context as parameter (proper Go idiom)
// 5. ComponentManager can cancel individual components: mc.Cancel()
Context context.Context // Named child context for this specific component
Cancel context.CancelFunc // Named cancellation for this specific component
// StartOrder tracks the order components were started for reverse shutdown
StartOrder int
// LastError tracks the last error that occurred during lifecycle operations
LastError error
}
ManagedComponent tracks a component and its lifecycle state This is used by ComponentManager to properly manage component lifecycle
type Metadata ¶
type Metadata struct {
Name string `json:"name"`
Type string `json:"type"` // "input", "processor", "output", "storage"
Description string `json:"description"`
Version string `json:"version"`
}
Metadata describes what a component is
type NATSPort ¶
type NATSPort struct {
Subject string `json:"subject"`
Queue string `json:"queue,omitempty"`
Interface *InterfaceContract `json:"interface,omitempty"`
}
NATSPort - NATS pub/sub
func (NATSPort) IsExclusive ¶
IsExclusive returns false as multiple components can subscribe
func (NATSPort) ResourceID ¶
ResourceID returns unique identifier for NATS ports
type NATSRequestPort ¶
type NATSRequestPort struct {
Subject string `json:"subject"`
Timeout string `json:"timeout,omitempty"` // Duration string e.g. "1s", "500ms"
Retries int `json:"retries,omitempty"`
Interface *InterfaceContract `json:"interface,omitempty"`
}
NATSRequestPort - NATS Request/Response pattern for synchronous operations
func (NATSRequestPort) IsExclusive ¶
func (n NATSRequestPort) IsExclusive() bool
IsExclusive returns false as multiple components can handle requests
func (NATSRequestPort) ResourceID ¶
func (n NATSRequestPort) ResourceID() string
ResourceID returns unique identifier for NATS request ports
func (NATSRequestPort) Type ¶
func (n NATSRequestPort) Type() string
Type returns the port type identifier
type NATSRequestPortConfig ¶
type NATSRequestPortConfig struct {
Subject string `json:"subject"`
Timeout string `json:"timeout,omitempty"`
}
NATSRequestPortConfig represents a NATS request/reply port configuration Type alias for NATSRequestPort for test compatibility
func (NATSRequestPortConfig) IsExclusive ¶
func (n NATSRequestPortConfig) IsExclusive() bool
IsExclusive returns false as multiple components can handle requests
func (NATSRequestPortConfig) ResourceID ¶
func (n NATSRequestPortConfig) ResourceID() string
ResourceID returns unique identifier for NATS request ports
func (NATSRequestPortConfig) Type ¶
func (n NATSRequestPortConfig) Type() string
Type returns the port type identifier
type NATSStreamPortConfig ¶
type NATSStreamPortConfig struct {
Subject string `json:"subject"`
Consumer string `json:"consumer,omitempty"`
}
NATSStreamPortConfig represents a NATS streaming port configuration Used for stream-based message delivery patterns
func (NATSStreamPortConfig) IsExclusive ¶
func (n NATSStreamPortConfig) IsExclusive() bool
IsExclusive returns false as multiple components can subscribe
func (NATSStreamPortConfig) ResourceID ¶
func (n NATSStreamPortConfig) ResourceID() string
ResourceID returns unique identifier for NATS stream ports
func (NATSStreamPortConfig) Type ¶
func (n NATSStreamPortConfig) Type() string
Type returns the port type identifier
type NetworkPort ¶
type NetworkPort struct {
Protocol string `json:"protocol"` // "tcp", "udp"
Host string `json:"host"` // "0.0.0.0", "localhost"
Port int `json:"port"` // 14550, 8080
}
NetworkPort - TCP/UDP network bindings
func (NetworkPort) IsExclusive ¶
func (n NetworkPort) IsExclusive() bool
IsExclusive returns true as network ports are exclusive
func (NetworkPort) ResourceID ¶
func (n NetworkPort) ResourceID() string
ResourceID returns unique identifier for network ports
type NoOpLifecycleReporter ¶
type NoOpLifecycleReporter struct{}
NoOpLifecycleReporter is a no-op implementation for when lifecycle reporting is disabled.
func NewNoOpLifecycleReporter ¶
func NewNoOpLifecycleReporter() *NoOpLifecycleReporter
NewNoOpLifecycleReporter creates a no-op lifecycle reporter.
func (*NoOpLifecycleReporter) ReportCycleComplete ¶
func (r *NoOpLifecycleReporter) ReportCycleComplete(_ context.Context) error
ReportCycleComplete is a no-op.
func (*NoOpLifecycleReporter) ReportCycleError ¶
func (r *NoOpLifecycleReporter) ReportCycleError(_ context.Context, _ error) error
ReportCycleError is a no-op.
func (*NoOpLifecycleReporter) ReportCycleStart ¶
func (r *NoOpLifecycleReporter) ReportCycleStart(_ context.Context) error
ReportCycleStart is a no-op.
func (*NoOpLifecycleReporter) ReportStage ¶
func (r *NoOpLifecycleReporter) ReportStage(_ context.Context, _ string) error
ReportStage is a no-op.
type PayloadBuilder ¶
PayloadBuilder creates a typed payload from field mappings. Used by workflow variable interpolation to construct typed payloads from step output maps. Returns error if required fields are missing or field values cannot be converted to the target type. Returns any to avoid import cycles - the actual payload should implement message.Payload.
OPTIONAL: If not provided, BuildPayload falls back to JSON marshal/unmarshal using the Factory to create the target type. Custom builders are only needed for performance optimization of high-frequency payload types.
type PayloadFactory ¶
type PayloadFactory func() any
PayloadFactory creates a payload instance for a specific message type. The factory returns an any to avoid import cycles. The actual payload should implement the message.Payload interface.
type PayloadRegistration ¶
type PayloadRegistration struct {
Factory PayloadFactory `json:"-"` // Factory function (not serializable)
Builder PayloadBuilder `json:"-"` // Builder function (not serializable)
Domain string `json:"domain"` // Message domain (e.g., "robotics", "sensors")
Category string `json:"category"` // Message category (e.g., "heartbeat", "gps")
Version string `json:"version"` // Schema version (e.g., "v1", "v2")
Description string `json:"description"` // Human-readable description
Example map[string]any `json:"example"` // Optional example payload data
}
PayloadRegistration holds factory and metadata for a payload type. This follows the same pattern as component Registration but is specific to message payload types.
func (*PayloadRegistration) MessageType ¶
func (pr *PayloadRegistration) MessageType() string
MessageType returns the formatted message type string for this registration. Format: "domain.category.version" (e.g., "robotics.heartbeat.v1")
type PayloadRegistry ¶
type PayloadRegistry struct {
// contains filtered or unexported fields
}
PayloadRegistry manages payload factories for message deserialization. It provides thread-safe registration and lookup of payload factories, enabling BaseMessage.UnmarshalJSON to recreate typed payloads from JSON.
The registry follows the same patterns as the component Registry but is specifically designed for message payload types.
func GlobalPayloadRegistry ¶
func GlobalPayloadRegistry() *PayloadRegistry
GlobalPayloadRegistry returns the global payload registry. This is useful for introspection, such as listing all registered payloads.
func NewPayloadRegistry ¶
func NewPayloadRegistry() *PayloadRegistry
NewPayloadRegistry creates a new empty payload registry.
func (*PayloadRegistry) BuildPayload ¶
func (pr *PayloadRegistry) BuildPayload(domain, category, version string, fields map[string]any) (any, error)
BuildPayload creates a typed payload from field mappings. If a custom Builder is registered, it is used for efficient field mapping. Otherwise, falls back to JSON marshal/unmarshal using the Factory.
Returns an error if the message type is not registered or if building fails. This is used by workflow variable interpolation to construct typed payloads from step output maps. Returns any to avoid import cycles - the actual payload implements message.Payload.
func (*PayloadRegistry) CreatePayload ¶
func (pr *PayloadRegistry) CreatePayload(domain, category, version string) any
CreatePayload creates a payload instance using the registered factory. Returns nil if the message type is not registered. This allows BaseMessage.UnmarshalJSON to handle unknown types gracefully by falling back to GenericPayload.
func (*PayloadRegistry) GetRegistration ¶
func (pr *PayloadRegistry) GetRegistration(msgType string) (*PayloadRegistration, bool)
GetRegistration returns the payload registration for a specific message type. Returns the registration and true if found, nil and false otherwise.
func (*PayloadRegistry) ListByDomain ¶
func (pr *PayloadRegistry) ListByDomain(domain string) []*PayloadRegistration
ListByDomain returns all payload registrations for a specific domain. This is useful for discovering what message types are available within a particular domain (e.g., "robotics", "sensors").
func (*PayloadRegistry) ListPayloads ¶
func (pr *PayloadRegistry) ListPayloads() map[string]*PayloadRegistration
ListPayloads returns all registered payload types. Returns a copy of the registrations map to prevent external modification.
func (*PayloadRegistry) RegisterPayload ¶
func (pr *PayloadRegistry) RegisterPayload(registration *PayloadRegistration) error
RegisterPayload registers a payload factory with validation. The message type is derived from the registration's Domain, Category, and Version fields. Returns an error if validation fails or the type is already registered.
type PlatformMeta ¶
type PlatformMeta = types.PlatformMeta
PlatformMeta provides platform identity to components. Type alias to avoid import cycles while maintaining compatibility.
type Port ¶
type Port struct {
Name string `json:"name"`
Direction Direction `json:"direction"`
Required bool `json:"required"`
Description string `json:"description"`
Config Portable `json:"config"`
}
Port describes any I/O interface
func BuildPortFromDefinition ¶
func BuildPortFromDefinition(def PortDefinition, direction Direction) Port
BuildPortFromDefinition creates a Port from a PortDefinition
func MergePortConfigs ¶
func MergePortConfigs(defaults []Port, overrides []PortDefinition, direction Direction) []Port
MergePortConfigs merges default ports with configured overrides
func (Port) MarshalJSON ¶
MarshalJSON provides custom JSON marshaling for Port struct This handles the Portable interface by creating a wrapper with type information
func (*Port) UnmarshalJSON ¶
UnmarshalJSON provides custom JSON unmarshaling for Port struct This handles reconstruction of the Portable interface from JSON
type PortCapability ¶
type PortCapability struct {
Name string `json:"name"`
Subject string `json:"subject"`
Type string `json:"type"`
Interface string `json:"interface,omitempty"`
Description string `json:"description,omitempty"`
}
PortCapability describes an input or output port for discovery.
type PortConfig ¶
type PortConfig struct {
Inputs []PortDefinition `json:"inputs,omitempty"`
Outputs []PortDefinition `json:"outputs,omitempty"`
KVWrite []PortDefinition `json:"kv_write,omitempty"`
}
PortConfig represents port configuration in component config
type PortDefinition ¶
type PortDefinition struct {
Name string `json:"name" schema:"readonly,type:string,description:Port identifier"`
Type string `json:"type,omitempty" schema:"readonly,type:string,description:Port type (nats jetstream kv-watch etc)"`
Subject string `json:"subject,omitempty" schema:"editable,type:string,description:NATS subject pattern or network address"`
Interface string `json:"interface,omitempty" schema:"readonly,type:string,description:Interface contract type"`
Required bool `json:"required,omitempty" schema:"readonly,type:bool,description:Whether port connection is required"`
Description string `json:"description,omitempty" schema:"readonly,type:string,description:Human-readable port description"`
Timeout string `json:"timeout,omitempty" schema:"editable,type:string,description:Request timeout for request/reply ports"`
StreamName string `json:"stream_name,omitempty" schema:"editable,type:string,description:JetStream stream name"`
Bucket string `json:"bucket,omitempty" schema:"editable,type:string,description:KV bucket name for KV ports"`
// Config holds type-specific port configuration (e.g., JetStreamPort for consumer settings)
Config any `json:"config,omitempty" schema:"editable,type:object,description:Type-specific port configuration"`
}
PortDefinition represents a port configuration from JSON
type PortFieldInfo ¶
PortFieldInfo describes metadata for PortDefinition fields
type Portable ¶
type Portable interface {
ResourceID() string // Unique identifier for conflict detection
IsExclusive() bool // Whether multiple components can share
Type() string // Port type identifier
}
Portable interface - minimal, no Get prefix (Go idiomatic)
type ProcessorMetrics ¶
type ProcessorMetrics struct {
// EventsProcessed counts total events processed, labeled by operation type
EventsProcessed *prometheus.CounterVec
// EventsErrors counts total processing errors, labeled by error type
EventsErrors *prometheus.CounterVec
// KVOperations counts KV bucket operations, labeled by operation (get/put/delete/watch)
KVOperations *prometheus.CounterVec
// ProcessingDuration measures processing latency in seconds
ProcessingDuration prometheus.Histogram
// contains filtered or unexported fields
}
ProcessorMetrics provides standard Prometheus metrics for processor components. Each processor component should create its own instance with a unique subsystem name.
func NewProcessorMetrics ¶
func NewProcessorMetrics(registry *metric.MetricsRegistry, subsystem string) *ProcessorMetrics
NewProcessorMetrics creates and registers processor metrics with the given subsystem name. The subsystem name should be the component name with underscores (e.g., "graph_ingest"). If registry is nil, metrics are created but not registered (useful for testing).
func (*ProcessorMetrics) ObserveDuration ¶
func (m *ProcessorMetrics) ObserveDuration(seconds float64)
ObserveDuration records a processing duration
func (*ProcessorMetrics) RecordError ¶
func (m *ProcessorMetrics) RecordError(errorType string)
RecordError increments the error counter for the given error type
func (*ProcessorMetrics) RecordEvent ¶
func (m *ProcessorMetrics) RecordEvent(operation string)
RecordEvent increments the events processed counter for the given operation
func (*ProcessorMetrics) RecordKVOperation ¶
func (m *ProcessorMetrics) RecordKVOperation(operation string)
RecordKVOperation increments the KV operations counter
type PropertySchema ¶
type PropertySchema struct {
Type string `json:"type"` // "string", "int", "bool", "float", "enum", "array", "object", "ports", "cache"
Description string `json:"description"`
Default any `json:"default,omitempty"`
Enum []string `json:"enum,omitempty"` // Valid string values
Minimum *int `json:"minimum,omitempty"` // For numeric types
Maximum *int `json:"maximum,omitempty"` // For numeric types
Category string `json:"category,omitempty"` // "basic" or "advanced" for UI organization
PortFields map[string]PortFieldInfo `json:"portFields,omitempty"` // Metadata for port fields (when type is "ports")
CacheFields map[string]CacheFieldInfo `json:"cacheFields,omitempty"` // Metadata for cache fields (when type is "cache")
Properties map[string]PropertySchema `json:"properties,omitempty"` // Nested properties for object types
Required []string `json:"required,omitempty"` // Required nested fields for object types
Items *PropertySchema `json:"items,omitempty"` // Item schema for array types
}
PropertySchema describes a single configuration property
type Registerable ¶
type Registerable interface {
Registration() Registration
}
Registerable allows components to self-describe for registry registration
type Registration ¶
type Registration struct {
Name string `json:"name"` // Factory name (e.g., "udp-input")
Type string `json:"type"` // Component type (input/processor/output/storage)
Protocol string `json:"protocol"` // Technical protocol (udp, mavlink, websocket, etc.)
Domain string `json:"domain"` // Business domain (robotics, semantic, network, storage)
Description string `json:"description"` // Human-readable description
Version string `json:"version"` // Component version
Schema ConfigSchema `json:"schema"` // Schema as static metadata (Feature 011)
Factory Factory `json:"-"` // Factory function (not serializable)
Dependencies []string `json:"dependencies"` // Optional: other required components
}
Registration holds factory and metadata for a component type
type RegistrationConfig ¶
type RegistrationConfig struct {
Name string // Component name (e.g., "udp", "websocket", "graph-processor")
Factory Factory // Factory function to create component instances
Schema ConfigSchema // Configuration schema for validation and discovery
Type string // Component type: "input", "processor", "output", "storage"
Protocol string // Technical protocol (udp, tcp, websocket, file, etc.)
Domain string // Business domain (network, storage, processing, robotics, semantic)
Description string // Human-readable description of the component
Version string // Component version (semver recommended)
}
RegistrationConfig provides a clean API for component registration. This config struct replaces the previous 7-8 parameter function signatures. It maps 1:1 to Registration struct fields for simplicity.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry manages component factories and instances It provides thread-safe registration and lookup of both factories (for creation) and instances (for discovery and management).
func NewRegistry ¶
NewRegistry creates a new empty component registry Optionally accepts a logger; defaults to slog.Default() if none provided. This maintains backwards compatibility with existing callers.
func (*Registry) Component ¶
func (r *Registry) Component(name string) Discoverable
Component retrieves a specific component instance by name Returns nil if the component is not found.
func (*Registry) CreateComponent ¶
func (r *Registry) CreateComponent( instanceName string, config types.ComponentConfig, deps Dependencies, ) (Discoverable, error)
CreateComponent creates a component instance using the specified factory The componentType parameter specifies which factory to use, instanceName gives the created component a unique identifier, and config provides all component configuration including dependencies. CreateComponent creates and registers a new component instance. The instanceName parameter is the unique identifier for this instance (e.g., "udp-sensor-main"). The config contains the factory name, type, and component-specific configuration. Factory functions don't do I/O, so no context is needed.
func (*Registry) CreatePayload ¶
CreatePayload creates a payload instance using the registered factory. Returns nil if the message type is not registered.
func (*Registry) GetCapabilities ¶
func (r *Registry) GetCapabilities(subjectPattern string) []*CapabilityAnnouncement
GetCapabilities returns capabilities matching the subject pattern. Supports NATS wildcards: "*" matches one token, ">" matches one or more tokens at end. Returns empty slice (not nil) when no matches or cache empty. Thread-safe for concurrent access.
func (*Registry) GetComponent ¶
func (r *Registry) GetComponent(name string) (Discoverable, error)
GetComponent retrieves a component instance by factory type name (for schema retrieval) DEPRECATED: Use GetComponentSchema() instead for schema retrieval. This method creates a temporary component instance, which fails for components with dependency validation.
func (*Registry) GetComponentSchema ¶
func (r *Registry) GetComponentSchema(name string) (ConfigSchema, error)
GetComponentSchema retrieves a component's schema directly from Registration metadata This method retrieves schemas without component instantiation (Feature 011 - Option 1) Schema is stored as static metadata during registration, avoiding dependency validation issues
func (*Registry) GetFactory ¶
GetFactory returns a specific factory by name Unlike ListFactories, this returns the actual Factory function for creating components
func (*Registry) InitNATS ¶
InitNATS initializes NATS JetStream capability discovery using natsclient.Client. Creates the COMPONENT_CAPABILITIES stream if it doesn't exist.
func (*Registry) ListAvailable ¶
ListAvailable returns information about all available component types This provides metadata about what types of components can be created.
func (*Registry) ListComponentTypes ¶
ListComponentTypes returns all registered component factory type names This returns factory names (e.g., "udp-input", "websocket-output") not instance names
func (*Registry) ListComponents ¶
func (r *Registry) ListComponents() map[string]Discoverable
ListComponents returns all registered component instances This is used by the discovery service to provide information about currently running components.
func (*Registry) ListFactories ¶
func (r *Registry) ListFactories() map[string]*Registration
ListFactories returns all registered component factories This provides information about what types of components can be created.
func (*Registry) ListPayloads ¶
func (r *Registry) ListPayloads() map[string]*PayloadRegistration
ListPayloads returns all registered payload types.
func (*Registry) RegisterFactory ¶
func (r *Registry) RegisterFactory(name string, registration *Registration) error
RegisterFactory registers a component factory with the given name Returns an error if a factory with the same name is already registered.
func (*Registry) RegisterInstance ¶
func (r *Registry) RegisterInstance(name string, component Discoverable) error
RegisterInstance registers a component instance with the given name This allows the instance to be discovered and managed. Returns an error if an instance with the same name is already registered.
func (*Registry) RegisterPayload ¶
func (r *Registry) RegisterPayload(registration *PayloadRegistration) error
RegisterPayload registers a payload factory with the registry. This allows typed payloads to be recreated during message deserialization.
func (*Registry) RegisterWithConfig ¶
func (r *Registry) RegisterWithConfig(config RegistrationConfig) error
RegisterWithConfig registers a component using a configuration struct. This is the recommended registration method that replaces the multi-parameter functions.
Example usage:
registry.RegisterWithConfig(component.RegistrationConfig{
Name: "udp",
Factory: CreateUDPInput,
Schema: udpSchema,
Type: "input",
Protocol: "udp",
Domain: "network",
Description: "UDP input component for receiving network data",
Version: "1.0.0",
})
func (*Registry) StartHeartbeat ¶
StartHeartbeat starts periodic republishing of all component capabilities.
func (*Registry) StopHeartbeat ¶
func (r *Registry) StopHeartbeat()
StopHeartbeat stops the heartbeat goroutine.
func (*Registry) SubscribeCapabilities ¶
SubscribeCapabilities subscribes to capability announcements from NATS. If no patterns provided, defaults to "*.capabilities.*" (all components).
func (*Registry) UnregisterInstance ¶
UnregisterInstance removes a component instance from the registry This is typically called when a component is stopped or destroyed.
func (*Registry) WaitForCapabilities ¶
func (r *Registry) WaitForCapabilities(ctx context.Context, pattern string, minCount int, timeout time.Duration) error
WaitForCapabilities waits until minimum capabilities matching pattern are discovered. Returns immediately if len(GetCapabilities(pattern)) >= minCount. Returns ctx.Err() on context cancellation. Returns nil on timeout (proceed anyway - NOT an error per plan). Polls every 100ms.
type SchemaDirectives ¶
type SchemaDirectives struct {
// core (required)
Type string // REQUIRED - field type
Description string // REQUIRED (warning if missing)
// UI Organization
Category string // "basic" or "advanced"
ReadOnly bool // For PortDefinition fields
Editable bool // For PortDefinition fields
Hidden bool // Hide from UI
// Constraints
Default any // Type-specific default value (stored as string, converted during schema generation)
Required bool // Field must be provided
Min *int // Numeric minimum
Max *int // Numeric maximum
Enum []string // Valid enum values
// Future extensions (stored but not used yet)
Help string
Placeholder string
Pattern string
Format string
}
SchemaDirectives represents parsed schema tag directives
func ParseSchemaTag ¶
func ParseSchemaTag(tag string) (SchemaDirectives, error)
ParseSchemaTag parses a schema struct tag into directives.
Tag Syntax:
- Directives are comma-separated
- Key-value pairs use colon: "key:value"
- Boolean flags have no colon: "readonly", "required"
- Enum values are pipe-separated: "enum:val1|val2|val3"
- Whitespace is trimmed from all values
Required Directives:
- type: Field data type (string, int, bool, float, enum, array, object, ports)
Recommended Directives:
- description: Human-readable field description (used for UI and documentation)
Optional Directives:
- category: UI organization (basic, advanced)
- default: Default value (converted to appropriate type)
- min/max: Numeric constraints
- enum: Valid values for enum types (pipe-separated)
- readonly: Field is read-only (boolean flag)
- editable: Field is user-editable (boolean flag)
- hidden: Field is hidden from UI (boolean flag)
- required: Field must be provided (boolean flag)
Example Tags:
schema:"type:string,description:Component name,category:basic" schema:"type:int,description:Port,min:1,max:65535,default:8080" schema:"type:enum,description:Level,enum:debug|info|warn,default:info" schema:"required,type:string,description:API key" schema:"readonly,type:string,description:System ID"
Returns an error if:
- Tag is empty
- Type directive is missing
- Type value is invalid
- Directive syntax is malformed
- Numeric values cannot be parsed
See SCHEMA_TAG_SPEC.md for complete specification.
Example ¶
ExampleParseSchemaTag demonstrates parsing individual schema tags
package main
import (
"fmt"
"github.com/c360studio/semstreams/component"
)
func main() {
// Parse a simple field tag
tag := "type:int,description:Port number,min:1,max:65535,default:8080"
directives, err := component.ParseSchemaTag(tag)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Type: %s\n", directives.Type)
fmt.Printf("Description: %s\n", directives.Description)
fmt.Printf("Min: %d\n", *directives.Min)
fmt.Printf("Max: %d\n", *directives.Max)
fmt.Printf("Default: %s\n", directives.Default)
}
Output: Type: int Description: Port number Min: 1 Max: 65535 Default: 8080
Example (Enum) ¶
ExampleParseSchemaTag_enum demonstrates parsing enum tags
package main
import (
"fmt"
"github.com/c360studio/semstreams/component"
)
func main() {
tag := "type:enum,description:Log level,enum:debug|info|warn|error,default:info"
directives, _ := component.ParseSchemaTag(tag)
fmt.Printf("Type: %s\n", directives.Type)
fmt.Printf("Description: %s\n", directives.Description)
fmt.Printf("Enum values: %v\n", directives.Enum)
fmt.Printf("Default: %s\n", directives.Default)
}
Output: Type: enum Description: Log level Enum values: [debug info warn error] Default: info
Example (Flags) ¶
ExampleParseSchemaTag_flags demonstrates boolean flags
package main
import (
"fmt"
"github.com/c360studio/semstreams/component"
)
func main() {
tag := "required,readonly,type:string,description:System identifier"
directives, _ := component.ParseSchemaTag(tag)
fmt.Printf("Type: %s\n", directives.Type)
fmt.Printf("Required: %v\n", directives.Required)
fmt.Printf("ReadOnly: %v\n", directives.ReadOnly)
}
Output: Type: string Required: true ReadOnly: true
type SimpleMockComponent ¶
type SimpleMockComponent struct {
// contains filtered or unexported fields
}
SimpleMockComponent is a simple component implementation for testing
func (*SimpleMockComponent) ConfigSchema ¶
func (m *SimpleMockComponent) ConfigSchema() ConfigSchema
ConfigSchema returns the component's configuration schema
func (*SimpleMockComponent) DataFlow ¶
func (m *SimpleMockComponent) DataFlow() FlowMetrics
DataFlow returns the component's data flow metrics (zeros for mock)
func (*SimpleMockComponent) Health ¶
func (m *SimpleMockComponent) Health() HealthStatus
Health returns the component's health status (always healthy for mock)
func (*SimpleMockComponent) InputPorts ¶
func (m *SimpleMockComponent) InputPorts() []Port
InputPorts returns the component's input ports (none for mock)
func (*SimpleMockComponent) Meta ¶
func (m *SimpleMockComponent) Meta() Metadata
Meta returns the component metadata
func (*SimpleMockComponent) OutputPorts ¶
func (m *SimpleMockComponent) OutputPorts() []Port
OutputPorts returns the component's output ports (none for mock)
type State ¶
type State int
State represents the current lifecycle state of a component
const ( // StateCreated indicates component was created but not initialized StateCreated State = iota // StateInitialized indicates component was initialized but not started StateInitialized // StateStarted indicates component is running StateStarted // StateStopped indicates component was stopped StateStopped // StateFailed indicates component failed during lifecycle operation StateFailed )
type Status ¶
type Status struct {
Component string `json:"component"`
Stage string `json:"stage"`
CycleID string `json:"cycle_id,omitempty"`
CycleStartedAt time.Time `json:"cycle_started_at,omitempty"`
StageStartedAt time.Time `json:"stage_started_at"`
LastCompletedAt time.Time `json:"last_completed_at,omitempty"`
LastResult string `json:"last_result,omitempty"` // "success" or "error"
LastError string `json:"last_error,omitempty"`
}
Status represents the current processing state of a component. This is used by ADR-003 lifecycle status pattern for async component observability.
type ThrottledLifecycleReporter ¶
type ThrottledLifecycleReporter struct {
// contains filtered or unexported fields
}
ThrottledLifecycleReporter wraps any LifecycleReporter to enforce minimum intervals between KV writes. Important events (CycleComplete, CycleError) are never throttled.
func NewThrottledLifecycleReporter ¶
func NewThrottledLifecycleReporter(delegate LifecycleReporter, minInterval time.Duration, logger *slog.Logger) *ThrottledLifecycleReporter
NewThrottledLifecycleReporter creates a throttled wrapper around any LifecycleReporter. minInterval is the minimum time between writes (default 1 second if zero).
func (*ThrottledLifecycleReporter) FlushPending ¶
func (r *ThrottledLifecycleReporter) FlushPending(ctx context.Context) error
FlushPending writes any pending stage to storage immediately. Call this before component shutdown or when immediate status update is needed.
func (*ThrottledLifecycleReporter) ReportCycleComplete ¶
func (r *ThrottledLifecycleReporter) ReportCycleComplete(ctx context.Context) error
ReportCycleComplete marks successful cycle completion. NEVER throttled - always written immediately. Also flushes any pending stage.
func (*ThrottledLifecycleReporter) ReportCycleError ¶
func (r *ThrottledLifecycleReporter) ReportCycleError(ctx context.Context, err error) error
ReportCycleError marks cycle failure with error details. NEVER throttled - always written immediately. Also flushes any pending stage.
func (*ThrottledLifecycleReporter) ReportCycleStart ¶
func (r *ThrottledLifecycleReporter) ReportCycleStart(ctx context.Context) error
ReportCycleStart marks the beginning of a new processing cycle. Throttled: queued if within minInterval window.
func (*ThrottledLifecycleReporter) ReportStage ¶
func (r *ThrottledLifecycleReporter) ReportStage(ctx context.Context, stage string) error
ReportStage updates the component's current processing stage. Throttled: if within minInterval, queues stage for next write window.
type TimerPort ¶
type TimerPort struct {
Interval string `json:"interval"` // Duration string e.g. "30s", "1m"
Interface *InterfaceContract `json:"interface,omitempty"`
}
TimerPort represents a periodic timer trigger port
func (TimerPort) IsExclusive ¶
IsExclusive returns false as multiple timers can run independently
func (TimerPort) ResourceID ¶
ResourceID returns unique identifier for timer ports
type Validatable ¶
type Validatable interface {
Validate() error
}
Validatable interface for configs that can self-validate
type ValidationError ¶
type ValidationError struct {
Field string `json:"field"` // Name of the field that failed validation
Message string `json:"message"` // Human-readable error message
Code string `json:"code"` // Machine-readable error code (see above)
}
ValidationError represents a validation error for a specific configuration field. It provides structured error information that can be displayed to users and mapped to specific form fields in the UI.
Error codes are standardized across frontend and backend:
- "required": Field is required but missing
- "min": Numeric value below minimum threshold
- "max": Numeric value above maximum threshold
- "enum": Value not in allowed enum values
- "type": Value doesn't match expected type (string, int, bool, etc.)
- "pattern": String doesn't match required pattern (future use)
func ValidateConfig ¶
func ValidateConfig(config map[string]any, schema ConfigSchema) []ValidationError
ValidateConfig validates a configuration map against a ConfigSchema. It checks required fields, type constraints, min/max bounds, and enum values.
The validation is lenient - unknown fields are allowed to support backward compatibility and future schema evolution. Only explicitly defined properties are validated against their schema constraints.
Returns a slice of ValidationError containing all validation failures found. An empty slice indicates the configuration is valid.
Example usage:
schema := component.ConfigSchema{
Properties: map[string]component.PropertySchema{
"port": {
Type: "int",
Minimum: ptrInt(1),
Maximum: ptrInt(65535),
Category: "basic",
},
},
Required: []string{"port"},
}
config := map[string]any{"port": 99999}
errors := component.ValidateConfig(config, schema)
if len(errors) > 0 {
// Handle validation errors
fmt.Printf("Validation failed: %s\n", errors[0].Message)
}