component

package
v1.0.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2026 License: MIT Imports: 27 Imported by: 0

README

Component Package

core component infrastructure for SemStreams, providing registration, discovery, lifecycle management, and instance creation with explicit registration patterns.

Overview

The component package defines the fundamental abstractions for all SemStreams components, enabling dynamic discovery, registration, and management of input, processor, output, and storage components. This package follows explicit registration patterns with dependency injection through structured configuration.

Components in SemStreams are self-describing units that can be discovered at runtime, configured through schemas, and managed through their lifecycle. The package supports four types of components: inputs (data sources), processors (data transformers), outputs (data sinks), and storage (persistence).

The Registry serves as the central component management system, handling both factory registration and instance management with thread-safe operations and proper lifecycle control.

Installation

import "github.com/c360/semstreams/component"

Architecture

Component Registration Flow

SemStreams uses EXPLICIT registration rather than init() self-registration:

flowchart TB
    subgraph Packages["Component Packages"]
        UDP[pkg/input/udp.go]
        WS[pkg/output/websocket.go]
        Graph[pkg/processor/graph]
        Robotics[pkg/processor/robotics]
        ObjStore[pkg/storage/objectstore]
    end

    subgraph Orchestration["componentregistry Package"]
        RegisterAll[RegisterAll function]
    end

    subgraph Main["main.go"]
        CreateReg[Create Registry]
        CallRegAll[Call RegisterAll]
        Ready[Components Ready]
    end

    UDP -->|"Register(registry)"| RegisterAll
    WS -->|"Register(registry)"| RegisterAll
    Graph -->|"Register(registry)"| RegisterAll
    Robotics -->|"Register(registry)"| RegisterAll
    ObjStore -->|"Register(registry)"| RegisterAll

    CreateReg --> CallRegAll
    RegisterAll --> CallRegAll
    CallRegAll --> Ready

    style Packages fill:#e1f5ff
    style Orchestration fill:#d4edda
    style Main fill:#fff3cd
Registration Pattern

Each component package exports a Register() function:

sequenceDiagram
    participant Main as main.go
    participant CR as componentregistry
    participant Reg as Registry
    participant UDP as pkg/input
    participant Graph as pkg/processor/graph

    Main->>Reg: NewRegistry()
    Main->>CR: RegisterAll(registry)
    CR->>UDP: Register(registry)
    UDP->>Reg: RegisterInput("udp", factory, ...)
    CR->>Graph: Register(registry)
    Graph->>Reg: RegisterProcessor("graph-processor", factory, ...)
    CR-->>Main: All components registered
    Main->>Reg: CreateComponent("udp-1", config, deps)
    Reg-->>Main: component instance
Why Explicit Registration?
Aspect init() Self-Registration Explicit Registration
Testability ❌ Global state, hard to isolate ✅ Create isolated test registries
Explicitness ❌ Hidden dependencies via imports ✅ Clear dependency graph
Control ❌ Automatic on import ✅ Application controls what/when
Side Effects ❌ Package import modifies globals ✅ No side effects from imports
Debugging ❌ Registration order unclear ✅ Deterministic, explicit order
FlowGraph Component

The FlowGraph component (flowgraph/) provides static analysis and validation of component interconnections. It is used by the flow engine's validator to analyze flow definitions before deployment.

Purpose: Build and validate connectivity graphs from component port definitions

Key Responsibilities:

  • Build connectivity graphs from component port definitions
  • Auto-discover connections via pattern matching (NATS subjects, KV buckets)
  • Detect orphaned ports and disconnected components
  • Validate interface contracts between connected ports
  • Identify resource conflicts (e.g., network port binding)

Important: FlowGraph is a validation tool, not a runtime component. It creates temporary graph structures for analysis during flow validation and is discarded after validation completes. The flow engine uses FlowGraph during pre-deployment validation but does not use it during runtime execution.

Relationship to Flow Infrastructure:

Flow Service (HTTP API)
    ↓ uses
Flow Engine (Lifecycle Orchestration)
    ↓ validation → FlowGraph (Static Analysis)
    ↓ deployment → Component Manager (Runtime)
  • FlowGraph: "Can these components connect?" (static graph analysis)
  • Flow Engine: "Deploy/start/stop this flow" (lifecycle orchestration)
  • Flow Service: "HTTP interface to flow operations" (REST API layer)

Each layer has distinct, non-overlapping responsibilities.

Quick Start

Basic Usage
package main

import (
    "encoding/json"
    "log"

    "github.com/c360/semstreams/component"
    "github.com/c360/semstreams/componentregistry"
    "github.com/c360/semstreams/types"
)

func main() {
    // Create registry and register all components
    registry := component.NewRegistry()
    if err := componentregistry.RegisterAll(registry); err != nil {
        log.Fatal(err)
    }

    // Create component configuration
    config := types.ComponentConfig{
        Type:    types.ComponentTypeInput,
        Name:    "udp",
        Enabled: true,
        Config:  json.RawMessage(`{"port": 8080, "bind": "0.0.0.0"}`),
    }

    // Prepare dependencies
    deps := component.Dependencies{
        NATSClient: natsClient,
        Platform: component.PlatformMeta{
            Org:      "c360",
            Platform: "platform1",
        },
        Logger: slog.Default(),
    }

    // Create component instance
    instance, err := registry.CreateComponent("udp-input-1", config, deps)
    if err != nil {
        log.Fatal(err)
    }

    // Component is ready to use
    log.Printf("Created: %s", instance.Meta().Name)
}
Implementing a Component
package mycomponent

import (
    "encoding/json"

    "github.com/c360/semstreams/component"
)

// Component implementation
type MyInput struct {
    config MyConfig
    deps   component.Dependencies
}

func (m *MyInput) Meta() component.Metadata {
    return component.Metadata{
        Name:        "my-input",
        Type:        "input",
        Description: "My custom input component",
        Version:     "1.0.0",
    }
}

func (m *MyInput) InputPorts() []component.Port { return nil }

func (m *MyInput) OutputPorts() []component.Port {
    return []component.Port{
        {
            Name:      "output",
            Direction: component.DirectionOutput,
            Required:  true,
            Config:    component.NATSPort{Subject: "my.output"},
        },
    }
}

func (m *MyInput) ConfigSchema() component.ConfigSchema {
    return component.ConfigSchema{
        Properties: map[string]component.PropertySchema{
            "interval": {Type: "duration", Description: "Poll interval"},
        },
    }
}

func (m *MyInput) Health() component.HealthStatus {
    return component.HealthStatus{Healthy: true}
}

func (m *MyInput) DataFlow() component.FlowMetrics {
    return component.FlowMetrics{}
}

// Factory function
func CreateMyInput(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error) {
    var config MyConfig
    if err := json.Unmarshal(rawConfig, &config); err != nil {
        return nil, err
    }

    return &MyInput{
        config: config,
        deps:   deps,
    }, nil
}

// IMPORTANT: Export Register() function, NOT init()
func Register(registry *component.Registry) error {
    return registry.RegisterWithConfig(component.RegistrationConfig{
        Name:        "my-input",
        Factory:     CreateMyInput,
        Schema:      myInputSchema,
        Type:        "input",
        Protocol:    "custom",
        Domain:      "network",
        Description: "My custom input component",
        Version:     "1.0.0",
    })
}

Then add to pkg/componentregistry/register.go:

import "github.com/yourorg/semstreams/pkg/mycomponent"

func registercore (registry *component.Registry) error {
    // ... existing registrations

    if err := mycomponent.Register(registry); err != nil {
        return err
    }

    return nil
}

core Concepts

Discoverable Interface

Every component must implement:

type Discoverable interface {
    Meta() Metadata                  // Component metadata
    InputPorts() []Port              // Input port definitions
    OutputPorts() []Port             // Output port definitions
    ConfigSchema() ConfigSchema      // Configuration schema
    Health() HealthStatus            // Current health status
    DataFlow() FlowMetrics          // Data flow metrics
}
Dependencies

Dependency injection structure:

type Dependencies struct {
    NATSClient      *natsclient.Client      // Required: messaging
    ObjectStore     ObjectStore             // Optional: persistence
    MetricsRegistry *metric.MetricsRegistry // Optional: Prometheus
    Logger          *slog.Logger            // Optional: logging
    Platform        PlatformMeta            // Required: identity
}
Port Types

Components declare ports using strongly-typed configurations:

// NATS Pub/Sub
component.NATSPort{Subject: "data.output"}

// JetStream durable streaming
component.JetStreamPort{Stream: "EVENTS", Subject: "events.>"}

// KV bucket watch
component.KVWatchPort{Bucket: "CONFIG", Keys: []string{"app.*"}}

// KV bucket write
component.KVWritePort{
    Bucket: "ENTITY_STATES",
    Interface: &component.InterfaceContract{
        Type:    "graph.EntityState",
        Version: "v1",
    },
}

// Network binding
component.NetworkPort{Protocol: "udp", Port: 14550, Bind: "0.0.0.0"}

API Reference

Registry
NewRegistry() *Registry

Creates a new Registry with initialized maps.

CreateComponent(instanceName string, config types.ComponentConfig, deps Dependencies) (Discoverable, error)

Creates a component instance from registered factory.

RegisterInput(name string, factory Factory, protocol, description, version string) error

Registers an input component factory.

RegisterProcessor(name string, factory Factory, protocol, description, version string) error

Registers a processor component factory.

RegisterOutput(name string, factory Factory, protocol, description, version string) error

Registers an output component factory.

RegisterStorage(name string, factory Factory, protocol, description, version string) error

Registers a storage component factory.

ListAvailable() map[string]Info

Returns metadata for all registered factories.

ListComponents() map[string]Discoverable

Returns all created component instances.

Types
Factory
type Factory func(rawConfig json.RawMessage, deps Dependencies) (Discoverable, error)

Factory function signature for component creation.

Error Handling

Error Types
ErrFactoryAlreadyExists // Duplicate factory registration
ErrInvalidFactory       // Invalid factory registration
ErrFactoryNotFound      // Unknown factory name
ErrComponentCreation    // Factory execution failed
ErrInstanceExists       // Instance name conflict
ErrInstanceNotFound     // Unknown instance
Error Detection
_, err := registry.CreateComponent("instance-1", config, deps)
if errors.Is(err, component.ErrFactoryNotFound) {
    // Configuration error - component type not registered
}
if errors.Is(err, component.ErrComponentCreation) {
    // Factory error - check component-specific logs
}

Testing

Isolated Test Registries
func TestMyComponent(t *testing.T) {
    // Create isolated registry for this test
    registry := component.NewRegistry()

    // Register only components needed
    if err := mycomponent.Register(registry); err != nil {
        t.Fatal(err)
    }

    // Create test dependencies
    deps := component.Dependencies{
        NATSClient: natsclient.NewTestClient(t),
        Platform: component.PlatformMeta{
            Org:      "test",
            Platform: "test",
        },
        Logger: slog.Default(),
    }

    // Test component creation
    instance, err := registry.CreateComponent("test-1", config, deps)
    assert.NoError(t, err)
    assert.Equal(t, "my-input", instance.Meta().Type)
}
Testing Patterns
  • ✅ Use real NATS via natsclient.NewTestClient() for integration tests
  • ✅ Create isolated registries per test to avoid global state
  • ✅ Mock external dependencies that cannot be containerized
  • ✅ Test component behavior through Discoverable interface
  • ✅ Verify factory registration and creation separately

Performance

Registry Operations
Operation Complexity Thread-Safe
Factory lookup O(1) Yes (read lock)
Component creation O(1) + factory time Yes (read lock)
Factory registration O(1) Yes (write lock)
List operations O(n) Yes (read lock)
Concurrency
  • Multiple goroutines can create components concurrently
  • Factory registration blocks component creation temporarily
  • No deadlocks due to ordered lock acquisition
  • Components maintain references until explicitly unregistered

Architecture Decisions

Explicit Registration vs init()

Decision: Use explicit Register() functions

Rationale:

  • Testability: Can create isolated registries without global state
  • Explicitness: Clear component dependency graph in componentregistry
  • Control: Application controls what gets registered and when
  • No side effects: Package imports don't modify global state
  • Deterministic: Registration order is explicit and controllable

Tradeoffs:

  • Requires componentregistry orchestration package
  • Registration must be explicitly called in main()
  • New components must update componentregistry.RegisterAll()
Dependency Injection via Struct

Decision: Use Dependencies struct

Rationale:

  • Avoids parameter proliferation
  • Easy to add dependencies without breaking factories
  • Enables testing with mock dependencies
  • Follows service architecture patterns
Factory Pattern

Decision: Components parse their own configuration

Rationale:

  • Enables flexible validation per component
  • Matches service constructor patterns
  • Centralizes configuration knowledge in component packages

Documentation

Overview

Package component defines the Discoverable interface and related types

Package component provides the core component infrastructure for SemStreams, enabling dynamic component discovery, registration, lifecycle management, and instance creation.

Overview

The component package defines fundamental abstractions for all SemStreams components, supporting four component types: inputs (data sources), processors (data transformers), outputs (data sinks), and storage (persistence). Components are self-describing units that can be discovered at runtime, configured through schemas, and managed through their lifecycle.

The Registry serves as the central component management system, handling both factory registration and instance management with thread-safe operations and proper lifecycle control.

Component Registration Pattern

SemStreams uses EXPLICIT registration rather than init() self-registration. This provides:

  • Testability: Can create isolated registries for testing
  • Explicitness: Clear component dependency graph
  • Control: Main application controls what gets registered
  • No side effects: No global state modification during package initialization

Registration Flow:

  1. Each component package exports a Register(*Registry) error function
  2. componentregistry.RegisterAll() orchestrates all registrations
  3. main.go explicitly calls RegisterAll() with a created Registry
  4. Components are now available for instantiation

Example component registration:

// In pkg/input/udp.go
func Register(registry *component.Registry) error {
	return registry.RegisterWithConfig(component.RegistrationConfig{
		Name:        "udp",
		Factory:     CreateUDPInput,
		Schema:      udpSchema,
		Type:        "input",
		Protocol:    "udp",
		Domain:      "network",
		Description: "UDP input component for network data",
		Version:     "1.0.0",
	})
}

// In pkg/componentregistry/register.go
func RegisterAll(registry *component.Registry) error {
	if err := input.Register(registry); err != nil {
		return err
	}
	if err := robotics.Register(registry); err != nil {
		return err
	}
	// ... more registrations
	return nil
}

// In cmd/semstreams/main.go
registry := component.NewRegistry()
if err := componentregistry.RegisterAll(registry); err != nil {
	log.Fatal(err)
}

Quick Start

Creating and using a component:

// Create component registry and register all components
registry := component.NewRegistry()
if err := componentregistry.RegisterAll(registry); err != nil {
	return err
}

// Create component configuration
config := types.ComponentConfig{
	Type:    types.ComponentTypeInput,
	Name:    "udp",
	Enabled: true,
	Config:  json.RawMessage(`{"port": 8080, "bind": "0.0.0.0"}`),
}

// Prepare component dependencies
deps := component.Dependencies{
	NATSClient: natsClient,
	Platform: component.PlatformMeta{
		Org:      "c360",
		Platform: "platform1",
	},
	Logger: slog.Default(),
}

// Create component instance
instance, err := registry.CreateComponent("udp-input-1", config, deps)
if err != nil {
	return err
}

// Component is now ready to use
meta := instance.Meta()
health := instance.Health()

core Concepts

Discoverable Interface:

Every component must implement Discoverable, providing metadata, port definitions, configuration schema, health status, and data flow metrics. This enables runtime introspection and management.

Registry Pattern:

The Registry manages component factories and instances with thread-safe operations. Components register explicitly via Register() functions called by componentregistry, and the Registry handles creation and lifecycle management.

Dependencies:

All external dependencies (NATS client, object store, metrics, logger, platform identity) are injected through Dependencies struct, following clean dependency injection patterns.

Port Types:

Components declare their inputs and outputs using strongly-typed ports that implement the Portable interface:

  • NATSPort: core pub/sub messaging on NATS subjects
  • JetStreamPort: Durable streaming with JetStream for reliable delivery
  • KVWatchPort: Watch KV bucket changes for real-time state observation
  • KVWritePort: Declare writes to KV buckets for flow validation
  • NATSRequestPort: Request/reply pattern with timeouts
  • NetworkPort: TCP/UDP network bindings for external connectivity

Example port configuration:

func (c *MyComponent) OutputPorts() []component.Port {
	return []component.Port{
		{
			Name:      "data_stream",
			Direction: component.DirectionOutput,
			Required:  true,
			Config:    component.NATSPort{Subject: "data.output"},
		},
		{
			Name:      "entity_states",
			Direction: component.DirectionOutput,
			Required:  false,
			Config: component.KVWritePort{
				Bucket: "ENTITY_STATES",
				Interface: &component.InterfaceContract{
					Type:    "graph.EntityState",
					Version: "v1",
				},
			},
		},
	}
}

Configuration Schema

Components define their configuration through ConfigSchema, enabling:

  • Schema-driven UI generation with type-specific form inputs
  • Client and server-side validation before config persistence
  • Property categorization (basic vs advanced) for progressive disclosure
  • Default value population for improved user experience

Schema Definition Example:

func (u *UDPInput) ConfigSchema() component.ConfigSchema {
	return component.ConfigSchema{
		Properties: map[string]component.PropertySchema{
			"port": {
				Type:        "int",
				Description: "UDP port to listen on",
				Default:     14550,
				Minimum:     ptrInt(1),
				Maximum:     ptrInt(65535),
				Category:    "basic",  // Shown by default in UI
			},
			"bind_address": {
				Type:        "string",
				Description: "IP address to bind to (0.0.0.0 for all interfaces)",
				Default:     "0.0.0.0",
				Category:    "basic",
			},
			"buffer_size": {
				Type:        "int",
				Description: "UDP receive buffer size in bytes",
				Default:     8192,
				Minimum:     ptrInt(512),
				Category:    "advanced",  // Hidden in collapsible section
			},
		},
		Required: []string{"port", "bind_address"},
	}
}

Property Types:

  • "string": Text input, optional pattern validation
  • "int": Number input with min/max constraints
  • "bool": Checkbox input
  • "float": Number input allowing decimals
  • "enum": Dropdown select with predefined values
  • "object": Complex nested configuration (JSON editor fallback in MVP)
  • "array": List of values (JSON editor fallback in MVP)

Schema Validation:

Configurations are validated both client-side (instant feedback) and server-side (before persistence) using the ValidateConfig() function:

config := map[string]any{
	"port": 99999,  // Exceeds maximum
}

errors := component.ValidateConfig(config, schema)
if len(errors) > 0 {
	// Returns: [{Field: "port", Message: "port must be <= 65535", Code: "max"}]
	// Frontend displays error next to the port field
}

Graceful Degradation:

Components without schemas still work - the UI falls back to a JSON editor. The system logs warnings when schemas are missing but continues operating:

// Component without schema - still functional
func (c *MyComponent) ConfigSchema() component.ConfigSchema {
	return component.ConfigSchema{}  // Empty schema
}
// UI will show: "Schema not available, using JSON editor"

Property Categorization:

The Category field organizes properties for progressive disclosure:

  • "basic": Common settings shown by default (port, host, enabled)
  • "advanced": Expert settings in collapsible section (buffer sizes, timeouts)
  • Empty/unset: Defaults to "advanced"

UI renders basic properties first, then advanced in a collapsible <details> element. Properties within each category are sorted alphabetically for consistency.

Helper Functions:

  • GetProperties(schema, category): Filter properties by category
  • SortedPropertyNames(schema): Get property names in UI display order
  • IsComplexType(propType): Identify object/array types needing special handling
  • ValidateConfig(config, schema): Validate configuration against schema

Discoverable Interface

All components must implement the Discoverable interface:

type Discoverable interface {
	Meta() Metadata           // Component metadata (name, type, version)
	InputPorts() []Port       // Input port definitions
	OutputPorts() []Port      // Output port definitions
	ConfigSchema() ConfigSchema // Configuration schema for validation
	Health() HealthStatus     // Current health status
	DataFlow() FlowMetrics    // Data flow metrics (messages, bytes)
}

This interface enables:

  • Runtime introspection of component capabilities
  • Dynamic configuration validation
  • Health monitoring and metrics collection
  • Data flow visualization and debugging

Dependencies

Dependencies are injected through a structured dependencies object:

type Dependencies struct {
	NATSClient      *natsclient.Client      // Required: messaging
	ObjectStore     ObjectStore             // Optional: persistence
	MetricsRegistry *metric.MetricsRegistry // Optional: Prometheus metrics
	Logger          *slog.Logger            // Optional: structured logging
	Platform        PlatformMeta            // Required: platform identity
}

Benefits:

  • Clean dependency injection
  • Easy testing with mock dependencies
  • Avoids parameter proliferation in factory functions
  • Follows service architecture patterns

Factory Pattern

Component factories follow a consistent signature:

type Factory func(rawConfig json.RawMessage, deps Dependencies) (Discoverable, error)

Example factory implementation:

func CreateUDPInput(rawConfig json.RawMessage, deps Dependencies) (component.Discoverable, error) {
	// Parse component-specific configuration
	var config UDPConfig
	if err := json.Unmarshal(rawConfig, &config); err != nil {
		return nil, fmt.Errorf("parse UDP config: %w", err)
	}

	// Validate configuration
	if err := config.Validate(); err != nil {
		return nil, fmt.Errorf("invalid UDP config: %w", err)
	}

	// Create component with dependencies
	return &UDPInput{
		config:     config,
		natsClient: deps.NATSClient,
		logger:     deps.Logger,
		platform:   deps.Platform,
	}, nil
}

Factories:

  • Receive raw JSON configuration and parse it themselves
  • Validate configuration before creating instances
  • Return initialized components ready to use
  • Follow service constructor patterns for consistency

Registry Thread Safety

All Registry operations are thread-safe:

  • Factory registration uses write locks
  • Component creation uses read locks for factory lookup
  • Instance tracking uses write locks
  • Listing operations use read locks

Concurrency characteristics:

  • Multiple goroutines can create components concurrently
  • Factory registration blocks component creation temporarily
  • ListAvailable() is safe to call during component creation
  • No deadlocks due to ordered lock acquisition

Error Handling

The package defines specific error types for different failure modes:

ErrFactoryAlreadyExists // Attempted to register duplicate factory
ErrInvalidFactory       // Invalid factory registration
ErrFactoryNotFound      // Attempted to create from unknown factory
ErrComponentCreation    // Factory returned error during creation
ErrInstanceExists       // Instance name already in use
ErrInstanceNotFound     // Attempted to access unknown instance

Error checking:

_, err := registry.CreateComponent("instance-1", config, deps)
if errors.Is(err, component.ErrFactoryNotFound) {
	// Handle missing factory - configuration error
}
if errors.Is(err, component.ErrComponentCreation) {
	// Handle factory failure - component-specific error
}

Testing

The explicit registration pattern makes testing straightforward:

// Create isolated test registry
registry := component.NewRegistry()

// Register only components needed for test
if err := input.Register(registry); err != nil {
	t.Fatal(err)
}

// Create test dependencies with mocks
deps := component.Dependencies{
	NATSClient: natsclient.NewTestClient(t),
	Platform: component.PlatformMeta{
		Org:      "test",
		Platform: "test-platform",
	},
	Logger: slog.Default(),
}

// Test component creation
instance, err := registry.CreateComponent("test-1", config, deps)
if err != nil {
	t.Fatal(err)
}

// Verify component behavior through Discoverable interface
assert.Equal(t, "udp", instance.Meta().Type)
assert.True(t, instance.Health().Healthy)

Testing patterns:

  • Use real NATS client via natsclient.NewTestClient() for integration tests
  • Create isolated registries per test to avoid global state
  • Mock external dependencies that cannot be containerized
  • Test component behavior through Discoverable interface methods
  • Verify factory registration and component creation separately

Performance Considerations

Registry Performance:

  • Factory lookup: O(1) with map-based storage
  • Component creation: Factory execution time + O(1) registry overhead
  • Memory: Components maintain references in Registry until unregistered
  • Concurrency: Read-write mutex allows concurrent component creation

Component Lifecycle:

  • Components are created on-demand, not pre-instantiated
  • Registry holds strong references to created instances
  • Memory is released when components are unregistered
  • No automatic garbage collection of unused components

Architecture Decisions

Explicit Registration vs init() Self-Registration:

Decision: Use explicit Register() functions called by componentregistry

Benefits:

  • Testability: Can create isolated registries without global state
  • Explicitness: Clear component dependency graph in componentregistry
  • Control: Main application controls what gets registered and when
  • No side effects: Package imports don't modify global state
  • Deterministic: Registration order is explicit and controllable

Tradeoffs:

  • Requires componentregistry orchestration package
  • Registration must be explicitly called in main()
  • New components must update componentregistry.RegisterAll()

Registry-Based Architecture vs Distributed Catalog:

Decision: Use centralized Registry for component management

Benefits:

  • Simpler to reason about and test
  • Single source of truth for component management
  • Thread-safe operations with minimal overhead
  • No network dependencies for component discovery

Dependency Injection via Struct:

Decision: Use Dependencies struct instead of individual parameters

Benefits:

  • Avoids parameter proliferation in factory functions
  • Easy to add new dependencies without breaking existing factories
  • Enables easy testing with mock dependencies
  • Follows service architecture patterns

Factory Pattern for Component Creation:

Decision: Service-like constructors that parse their own configuration

Benefits:

  • Components handle their own configuration parsing
  • Enables flexible configuration validation per component
  • Matches service constructor signatures for consistency
  • Centralizes configuration knowledge in component packages

Integration Points

Dependencies:

  • pkg/natsclient: Required for NATS messaging
  • pkg/storage/objectstore: Optional for persistence
  • pkg/metric: Optional for Prometheus metrics
  • log/slog: Optional for structured logging (defaults to slog.Default())

Used By:

  • pkg/service: Manager uses Registry for component lifecycle
  • pkg/componentregistry: Orchestrates component registration
  • cmd/semstreams: Application entry point creates and populates Registry

Data Flow:

Configuration → Factory Lookup → Factory Execution → Component Instance → Registry

Examples

See component_test.go and registry_test.go for comprehensive examples including:

  • Component registration and creation
  • Factory validation
  • Instance management
  • Error handling patterns
  • Testing with isolated registries

Package component provides base types and utilities for SemStreams components.

Package component provides port configuration and management for component connections.

Package component provides schema validation and helper functions

Package component provides schema tag parsing and generation for component configuration.

The schema tag system eliminates duplication between Config structs and ConfigSchema definitions by auto-generating schemas from struct tags. This provides a single source of truth for configuration metadata and follows Go stdlib patterns (similar to json tags).

Basic Usage

Define configuration with schema tags:

type MyConfig struct {
    Name string `json:"name" schema:"type:string,description:Component name,category:basic"`
    Port int    `json:"port" schema:"type:int,description:Port,min:1,max:65535,default:8080"`
}

Generate schema at init time:

var schema = component.GenerateConfigSchema(reflect.TypeOf(MyConfig{}))

Tag Syntax

Tags use comma-separated directives with colon-separated key-value pairs:

  • type:string - Field data type (required)
  • description:text - Field description (recommended)
  • category:basic - UI organization (basic or advanced)
  • default:value - Default value
  • min:N, max:N - Numeric constraints
  • enum:a|b|c - Valid enum values (pipe-separated)
  • readonly, editable - Boolean flags for PortDefinition fields
  • required, hidden - Boolean flags for validation and UI

Performance

Schema generation uses reflection but is designed for init-time execution:

  • Call GenerateConfigSchema once at package init
  • Cache result in package-level variable
  • Zero reflection cost at runtime

Error Handling

Invalid tags result in graceful degradation:

  • Fields with invalid tags are skipped
  • Errors are wrapped with context using pkg/errors
  • Missing descriptions use field names as fallback

See docs/architecture/SCHEMA_TAG_SPEC.md for complete specification.

Index

Examples

Constants

View Source
const (
	MaxStringLength = 1024          // Maximum length for string values
	MaxJSONSize     = 1024 * 1024   // Maximum JSON size (1MB)
	MinPort         = 1             // Minimum valid port number
	MaxPort         = 65535         // Maximum valid port number
	MaxInt          = math.MaxInt32 // Maximum safe integer value
	MinInt          = math.MinInt32 // Minimum safe integer value
)

Config validation constants - security limits

Variables

This section is empty.

Functions

func BenchmarkLifecycleMethods

func BenchmarkLifecycleMethods(b *testing.B, factory LifecycleFactory)

BenchmarkLifecycleMethods provides benchmark tests for lifecycle operations

func BuildPayload

func BuildPayload(domain, category, version string, fields map[string]any) (any, error)

BuildPayload creates a typed payload from field mappings using the globally registered builder. Returns an error if the payload type is not registered or if the builder fails. This is used by workflow variable interpolation to construct typed payloads from step output maps. Returns any to avoid import cycles - the actual payload implements message.Payload.

func CreatePayload

func CreatePayload(domain, category, version string) any

CreatePayload creates a payload instance using the globally registered factory. Returns nil if no factory is registered for the given type.

func GenerateCacheFieldSchema

func GenerateCacheFieldSchema() map[string]CacheFieldInfo

GenerateCacheFieldSchema generates metadata for cache.Config fields. This describes which fields in cache.Config are editable and their constraints, enabling the UI to render appropriate controls for cache configuration.

The function examines cache.Config struct tags to determine:

  • Field types (for appropriate UI controls)
  • Editability (whether users can modify the field)
  • Enum values (for strategy field)
  • Numeric constraints (for size limits)

All cache.Config fields are marked as "editable" to allow runtime configuration.

This metadata is included in ConfigSchema for fields with type "cache", allowing the frontend to correctly render cache configuration forms.

Returns:

  • Map of field names to CacheFieldInfo with type, editability, and constraint metadata

func GeneratePortFieldSchema

func GeneratePortFieldSchema() map[string]PortFieldInfo

GeneratePortFieldSchema generates metadata for PortDefinition fields. This describes which fields in PortDefinition are editable vs read-only, enabling the UI to render appropriate controls for port configuration.

The function examines PortDefinition struct tags to determine:

  • Field types (for appropriate UI controls)
  • Editability (whether users can modify the field)

Fields marked with "editable" tag are user-modifiable (e.g., Subject, Timeout). Fields marked with "readonly" tag are display-only (e.g., Name, Type). Fields without schema tags default to read-only string type.

This metadata is included in ConfigSchema for fields with type "ports", allowing the frontend to correctly render port configuration forms.

Returns:

  • Map of field names to PortFieldInfo with type and editability metadata

func GetBool

func GetBool(config map[string]any, key string, defaultValue bool) bool

GetBool safely extracts a boolean value from config with a default fallback and validation

func GetFloat64

func GetFloat64(config map[string]any, key string, defaultValue float64) float64

GetFloat64 safely extracts a float64 value from config with a default fallback and validation

func GetInt

func GetInt(config map[string]any, key string, defaultValue int) int

GetInt safely extracts an integer value from config with a default fallback and bounds checking

func GetProperties

func GetProperties(schema ConfigSchema, category string) map[string]PropertySchema

GetProperties filters schema properties by category for UI organization.

Components can categorize their configuration properties as "basic" (shown by default) or "advanced" (hidden in collapsible section). This function extracts properties belonging to a specific category.

Parameters:

  • schema: The component's configuration schema
  • category: Filter by "basic" or "advanced", or empty string for all properties

Properties without an explicit Category field default to "advanced".

Returns a map of property names to PropertySchema definitions matching the category.

Example:

schema := component.ConfigSchema{
    Properties: map[string]component.PropertySchema{
        "port":        {Type: "int", Category: "basic"},
        "buffer_size": {Type: "int", Category: "advanced"},
        "timeout":     {Type: "int"}, // Defaults to "advanced"
    },
}

basicProps := component.GetProperties(schema, "basic")
// Returns: map["port": {...}]

advancedProps := component.GetProperties(schema, "advanced")
// Returns: map["buffer_size": {...}, "timeout": {...}]

func GetPropertyValue

func GetPropertyValue(config map[string]any, key string) (any, bool)

GetPropertyValue safely extracts a property value from a configuration map.

Returns the value and true if the key exists, or nil and false if the key is not present in the map. This function is nil-safe - passing a nil config will return (nil, false).

Example:

config := map[string]any{"port": 8080, "host": "localhost"}
if port, exists := component.GetPropertyValue(config, "port"); exists {
    fmt.Printf("Port: %v\n", port)
}

func GetString

func GetString(config map[string]any, key string, defaultValue string) string

GetString safely extracts a string value from config with a default fallback and validation

func IsComplexType

func IsComplexType(propType string) bool

IsComplexType returns true if a property type requires complex rendering.

Complex types (object, array) cannot be rendered as simple form inputs and require specialized UI components like JSON editors or nested form builders.

Currently identifies "object" and "array" types as complex. In MVP, the UI falls back to a JSON editor for these types.

Example:

if component.IsComplexType(propSchema.Type) {
    // Use JSON editor fallback
    renderJSONEditor(propSchema)
} else {
    // Render type-specific input field
    renderInputField(propSchema)
}

func IsLifecycleComponent

func IsLifecycleComponent(comp Discoverable) bool

IsLifecycleComponent checks if a component supports lifecycle management

func RegisterPayload

func RegisterPayload(registration *PayloadRegistration) error

RegisterPayload registers a payload factory globally. This allows typed payloads to be recreated during message deserialization. Payloads use init() registration as they are data types, not lifecycle components.

func SafeUnmarshal

func SafeUnmarshal(rawConfig json.RawMessage, target any) error

SafeUnmarshal performs validated unmarshaling into a target struct It validates the JSON first, then unmarshals with additional type checking

func SortedPropertyNames

func SortedPropertyNames(schema ConfigSchema) []string

SortedPropertyNames returns property names in UI display order.

Properties are sorted by: 1. Category: "basic" properties first, then "advanced" properties 2. Alphabetically within each category

This ensures consistent, predictable ordering in configuration UIs. Properties without an explicit Category default to "advanced".

Example:

schema := component.ConfigSchema{
    Properties: map[string]component.PropertySchema{
        "port":         {Category: "basic"},
        "bind_address": {Category: "basic"},
        "buffer_size":  {Category: "advanced"},
        "timeout":      {}, // Defaults to "advanced"
    },
}

names := component.SortedPropertyNames(schema)
// Returns: ["bind_address", "port", "buffer_size", "timeout"]
//          ^-- basic (alpha) --^  ^---- advanced (alpha) ----^

func StandardLifecycleTests

func StandardLifecycleTests(t *testing.T, factory LifecycleFactory)

StandardLifecycleTests runs comprehensive lifecycle tests for any component that implements LifecycleComponent This ensures consistent testing standards across all components in the semstreams system

func TestErrorInjection

func TestErrorInjection(t *testing.T, factory LifecycleFactory)

TestErrorInjection tests components with injected errors

func ValidateComponentName

func ValidateComponentName(name string) error

ValidateComponentName validates component/instance names for security

func ValidateConfigKey

func ValidateConfigKey(key string) error

ValidateConfigKey checks if a configuration key is valid

func ValidateFactoryConfig

func ValidateFactoryConfig(rawConfig json.RawMessage) error

ValidateFactoryConfig performs validation before passing to factory This is the main security gate for all component configurations

func ValidateJSONSize

func ValidateJSONSize(data json.RawMessage) error

ValidateJSONSize checks if JSON input is within safe limits

func ValidateNetworkConfig

func ValidateNetworkConfig(port int, bindAddr string) error

ValidateNetworkConfig validates network configuration including port and bind address

func ValidatePortNumber

func ValidatePortNumber(port int) error

ValidatePortNumber validates port numbers are within valid range

func WithLogger

func WithLogger(logger *slog.Logger) func(*Registry)

WithLogger sets a custom logger for the registry

Types

type CacheFieldInfo

type CacheFieldInfo struct {
	Type     string   `json:"type"`
	Editable bool     `json:"editable"`
	Enum     []string `json:"enum,omitempty"` // For strategy field
	Min      *int     `json:"min,omitempty"`  // For numeric fields
}

CacheFieldInfo describes metadata for cache.Config fields

type CapabilityAnnouncement

type CapabilityAnnouncement struct {
	InstanceName string           `json:"instance_name"`
	Component    string           `json:"component"`
	Type         string           `json:"type"`
	Version      string           `json:"version"`
	InputPorts   []PortCapability `json:"input_ports,omitempty"`
	OutputPorts  []PortCapability `json:"output_ports,omitempty"`
	Timestamp    time.Time        `json:"timestamp"`
	TTL          time.Duration    `json:"ttl"`
	NodeID       string           `json:"node_id"`
}

CapabilityAnnouncement is published to NATS when components register.

type ConfigSchema

type ConfigSchema struct {
	Properties map[string]PropertySchema `json:"properties"`
	Required   []string                  `json:"required"`
}

ConfigSchema describes the configuration parameters for a component

func GenerateConfigSchema

func GenerateConfigSchema(configType reflect.Type) ConfigSchema

GenerateConfigSchema generates a ConfigSchema from a struct type using reflection. This function performs one-time reflection at initialization to extract schema metadata from struct field tags, eliminating the need for manual schema definitions.

Usage Pattern:

type MyComponentConfig struct {
    Name string `json:"name" schema:"type:string,description:Name,category:basic"`
    Port int    `json:"port" schema:"type:int,description:Port,min:1,max:65535"`
}

var schema = component.GenerateConfigSchema(reflect.TypeOf(MyComponentConfig{}))

Field Processing:

  • Only exported fields with both 'json' and 'schema' tags are included
  • json:"-" fields are skipped
  • Fields without schema tags are skipped
  • Invalid schema tags result in skipped fields (graceful degradation)

Special Handling:

  • Fields with type "ports" automatically include PortFieldSchema metadata
  • Default values are converted from strings to appropriate types
  • Required fields are added to the schema's Required list

Performance:

  • Call once at init() time - reflection cost is paid only once
  • Generated schemas are cached in package-level variables
  • Zero reflection overhead at runtime

Parameters:

  • configType: The reflect.Type of the config struct (use reflect.TypeOf(ConfigStruct{})) Pointer types are automatically dereferenced

Returns:

  • ConfigSchema with Properties map and Required list populated from struct tags
  • Empty schema for non-struct types
Example

ExampleGenerateConfigSchema demonstrates how to use schema tags to auto-generate configuration schemas from struct definitions

package main

import (
	"encoding/json"
	"fmt"
	"reflect"

	"github.com/c360studio/semstreams/component"
)

func main() {
	// Define a configuration struct with schema tags
	type ComponentConfig struct {
		// Basic configuration
		Name    string `json:"name"    schema:"type:string,description:Component name,category:basic"`
		Port    int    `json:"port"    schema:"type:int,description:Listen port,min:1,max:65535,default:8080,category:basic"`
		Enabled bool   `json:"enabled" schema:"type:bool,description:Enable component,default:true,category:basic"`

		// Advanced configuration
		Timeout  string `json:"timeout"   schema:"type:string,description:Request timeout,default:30s,category:advanced"`
		LogLevel string `json:"log_level" schema:"type:enum,description:Logging level,enum:debug|info|warn|error,default:info,category:advanced"`

		// Required field
		APIKey string `json:"api_key" schema:"required,type:string,description:Authentication API key"`
	}

	// Generate the schema at init time (one-time reflection cost)
	schema := component.GenerateConfigSchema(reflect.TypeOf(ComponentConfig{}))

	// The generated schema can be used for validation, UI generation, etc.
	schemaJSON, _ := json.MarshalIndent(schema, "", "  ")
	fmt.Println(string(schemaJSON))

	// Output will show the generated schema with all properties
}

type ConfigValidator

type ConfigValidator struct {
	// contains filtered or unexported fields
}

ConfigValidator provides secure validation for component configurations

func NewConfigValidator

func NewConfigValidator() *ConfigValidator

NewConfigValidator creates a validator with secure defaults

func (*ConfigValidator) ValidateConfig

func (v *ConfigValidator) ValidateConfig(rawConfig json.RawMessage) error

ValidateConfig performs comprehensive validation on raw JSON config This prevents injection attacks, resource exhaustion, and malformed input

type ConsumerConfig

type ConsumerConfig struct {
	DeliverPolicy string
	AckPolicy     string
	MaxDeliver    int
}

ConsumerConfig holds extracted JetStream consumer configuration.

func GetConsumerConfig

func GetConsumerConfig(port Port) ConsumerConfig

GetConsumerConfig extracts JetStream consumer configuration from a port. Returns safe defaults if port doesn't have JetStream config: - DeliverPolicy: "new" (safe default - don't replay historical messages) - AckPolicy: "explicit" - MaxDeliver: 3

func GetConsumerConfigFromDefinition

func GetConsumerConfigFromDefinition(portDef PortDefinition) ConsumerConfig

GetConsumerConfigFromDefinition extracts JetStream consumer configuration from a port definition. This is a convenience wrapper for use with PortDefinition instead of Port.

type DebugStatusProvider

type DebugStatusProvider interface {
	// DebugStatus returns extended debug information for the component.
	// The returned value should be JSON-serializable.
	DebugStatus() any
}

DebugStatusProvider is an optional interface for components that can provide extended debug information beyond basic health status.

type Dependencies

type Dependencies struct {
	NATSClient      *natsclient.Client      // NATS client for messaging
	MetricsRegistry *metric.MetricsRegistry // Metrics registry for Prometheus (can be nil)
	Logger          *slog.Logger            // Structured logger (can be nil, defaults to slog.Default())
	Platform        PlatformMeta            // Platform identity (organization and platform)
	Security        security.Config         // Platform-wide security configuration
	ModelRegistry   model.RegistryReader    // Unified model registry (can be nil)
}

Dependencies provides all external dependencies needed by components. This structure follows the same pattern as Dependencies, enabling components to receive properly structured dependencies rather than individual fields.

func (*Dependencies) GetLogger

func (d *Dependencies) GetLogger() *slog.Logger

GetLogger returns the configured logger or a default logger if none is provided

func (*Dependencies) GetLoggerWithComponent

func (d *Dependencies) GetLoggerWithComponent(componentName string) *slog.Logger

GetLoggerWithComponent returns a logger configured with component context

type Direction

type Direction string

Direction for data flow

const (
	DirectionInput  Direction = "input"
	DirectionOutput Direction = "output"
)

Direction constants for port data flow

type Discoverable

type Discoverable interface {
	// Meta returns basic component information
	Meta() Metadata

	// InputPorts returns the ports this component accepts data on
	InputPorts() []Port

	// OutputPorts returns the ports this component produces data on
	OutputPorts() []Port

	// ConfigSchema returns the configuration schema for this component
	ConfigSchema() ConfigSchema

	// Health returns current health status
	Health() HealthStatus

	// DataFlow returns current data flow metrics
	DataFlow() FlowMetrics
}

Discoverable defines the interface for components that can be discovered and inspected by the management layer. This interface enables dynamic discovery of component capabilities, configuration, and health status.

Components implementing this interface can be: - Input components: Accept external data (UDP, TCP, HTTP) - Processor components: Transform data (plugins) - Output components: Send data to external systems - Storage components: Store and retrieve data (ObjectStore, KV)

type ErrorInjectingComponent

type ErrorInjectingComponent struct {
	LifecycleComponent
	// contains filtered or unexported fields
}

ErrorInjectingComponent wraps a component to inject errors for testing

func NewErrorInjectingComponent

func NewErrorInjectingComponent(comp LifecycleComponent) *ErrorInjectingComponent

NewErrorInjectingComponent creates a component wrapper that can inject errors for testing

func (*ErrorInjectingComponent) Initialize

func (e *ErrorInjectingComponent) Initialize() error

Initialize initializes the component, returning injected error if configured

func (*ErrorInjectingComponent) InjectInitializeError

func (e *ErrorInjectingComponent) InjectInitializeError(err error)

InjectInitializeError configures the component to return an error on Initialize

func (*ErrorInjectingComponent) InjectStartError

func (e *ErrorInjectingComponent) InjectStartError(err error)

InjectStartError configures the component to return an error on Start

func (*ErrorInjectingComponent) InjectStopError

func (e *ErrorInjectingComponent) InjectStopError(err error)

InjectStopError configures the component to return an error on Stop

func (*ErrorInjectingComponent) Start

Start starts the component, returning injected error if configured

func (*ErrorInjectingComponent) Stop

func (e *ErrorInjectingComponent) Stop(timeout time.Duration) error

Stop stops the component, returning injected error if configured

type Factory

type Factory func(rawConfig json.RawMessage, deps Dependencies) (Discoverable, error)

Factory creates a component instance from configuration following service pattern The factory function receives raw JSON configuration and dependencies, parses its own config, and returns a properly initialized component that implements the Discoverable interface. All I/O operations should be performed in the component's Start() method, not in the factory. This pattern matches service constructors: func(rawConfig json.RawMessage, deps Dependencies) (Service, error)

type FilePort

type FilePort struct {
	Path    string `json:"path"`
	Pattern string `json:"pattern,omitempty"`
}

FilePort - File system access

func (FilePort) IsExclusive

func (f FilePort) IsExclusive() bool

IsExclusive returns false as multiple components can read files

func (FilePort) ResourceID

func (f FilePort) ResourceID() string

ResourceID returns unique identifier for file ports

func (FilePort) Type

func (f FilePort) Type() string

Type returns the port type identifier

type FlowMetrics

type FlowMetrics struct {
	MessagesPerSecond float64   `json:"messages_per_second"`
	BytesPerSecond    float64   `json:"bytes_per_second"`
	ErrorRate         float64   `json:"error_rate"`
	LastActivity      time.Time `json:"last_activity"`
}

FlowMetrics describes the current data flow through a component

type HealthStatus

type HealthStatus struct {
	Healthy    bool          `json:"healthy"`
	LastCheck  time.Time     `json:"last_check"`
	ErrorCount int           `json:"error_count"`
	LastError  string        `json:"last_error,omitempty"`
	Uptime     time.Duration `json:"uptime"`
	Status     string        `json:"status"`
}

HealthStatus describes the current health state of a component

type Info

type Info struct {
	Type        string `json:"type"`        // "input", "processor", "output", "storage"
	Protocol    string `json:"protocol"`    // Technical protocol (udp, tcp, mavlink, etc.)
	Domain      string `json:"domain"`      // Business domain (robotics, semantic, network, storage)
	Description string `json:"description"` // Human-readable description
	Version     string `json:"version"`     // Component version
}

Info holds metadata about an available component type

type InterfaceContract

type InterfaceContract struct {
	Type       string   `json:"type"`                 // e.g., "message.Storable"
	Version    string   `json:"version,omitempty"`    // e.g., "v1"
	Compatible []string `json:"compatible,omitempty"` // Also accepts these
}

InterfaceContract defines expected message interface

type JetStreamPort

type JetStreamPort struct {
	// Stream configuration (for outputs)
	StreamName      string   `json:"stream_name"`              // e.g., "ENTITY_EVENTS"
	Subjects        []string `json:"subjects"`                 // e.g., ["events.graph.entity.>"]
	Storage         string   `json:"storage,omitempty"`        // "file" or "memory", default "file"
	RetentionPolicy string   `json:"retention,omitempty"`      // "limits", "interest", "work_queue", default "limits"
	RetentionDays   int      `json:"retention_days,omitempty"` // Message retention in days, default 7
	MaxSizeGB       int      `json:"max_size_gb,omitempty"`    // Max stream size in GB, default 10
	Replicas        int      `json:"replicas,omitempty"`       // Number of replicas, default 1

	// Consumer configuration (for inputs)
	ConsumerName  string `json:"consumer_name,omitempty"`  // Durable consumer name
	DeliverPolicy string `json:"deliver_policy,omitempty"` // "all", "last", "new", default "new"
	AckPolicy     string `json:"ack_policy,omitempty"`     // "explicit", "none", "all", default "explicit"
	MaxDeliver    int    `json:"max_deliver,omitempty"`    // Max redelivery attempts, default 3

	// Interface contract
	Interface *InterfaceContract `json:"interface,omitempty"`
}

JetStreamPort - NATS JetStream for durable, at-least-once messaging

func (JetStreamPort) IsExclusive

func (j JetStreamPort) IsExclusive() bool

IsExclusive returns false as JetStream manages consumer coordination

func (JetStreamPort) ResourceID

func (j JetStreamPort) ResourceID() string

ResourceID returns unique identifier for JetStream ports

func (JetStreamPort) Type

func (j JetStreamPort) Type() string

Type returns the port type identifier

type KVLifecycleReporter

type KVLifecycleReporter struct {
	// contains filtered or unexported fields
}

KVLifecycleReporter implements LifecycleReporter using NATS KV storage. It writes component status to the COMPONENT_STATUS bucket.

func NewKVLifecycleReporter

func NewKVLifecycleReporter(kv jetstream.KeyValue, componentName string, logger *slog.Logger) *KVLifecycleReporter

NewKVLifecycleReporter creates a new lifecycle reporter that writes to NATS KV.

func (*KVLifecycleReporter) ReportCycleComplete

func (r *KVLifecycleReporter) ReportCycleComplete(ctx context.Context) error

ReportCycleComplete marks successful cycle completion.

func (*KVLifecycleReporter) ReportCycleError

func (r *KVLifecycleReporter) ReportCycleError(ctx context.Context, err error) error

ReportCycleError marks cycle failure with error details.

func (*KVLifecycleReporter) ReportCycleStart

func (r *KVLifecycleReporter) ReportCycleStart(ctx context.Context) error

ReportCycleStart marks the beginning of a new processing cycle.

func (*KVLifecycleReporter) ReportStage

func (r *KVLifecycleReporter) ReportStage(ctx context.Context, stage string) error

ReportStage updates the component's current processing stage.

type KVWatchPort

type KVWatchPort struct {
	Bucket    string             `json:"bucket"`            // e.g., "ENTITY_STATES"
	Keys      []string           `json:"keys,omitempty"`    // Keys to watch, empty = all
	History   bool               `json:"history,omitempty"` // Include historical values
	Interface *InterfaceContract `json:"interface,omitempty"`
}

KVWatchPort - NATS KV Watch for state observation

func (KVWatchPort) IsExclusive

func (k KVWatchPort) IsExclusive() bool

IsExclusive returns false as multiple watchers are allowed

func (KVWatchPort) ResourceID

func (k KVWatchPort) ResourceID() string

ResourceID returns unique identifier for KV watch ports

func (KVWatchPort) Type

func (k KVWatchPort) Type() string

Type returns the port type identifier

type KVWritePort

type KVWritePort struct {
	Bucket    string             `json:"bucket"`              // e.g., "ENTITY_STATES"
	Interface *InterfaceContract `json:"interface,omitempty"` // Data type contract
}

KVWritePort - NATS KV Write for state persistence

func (KVWritePort) IsExclusive

func (k KVWritePort) IsExclusive() bool

IsExclusive returns false as multiple writers are allowed (with CAS handling)

func (KVWritePort) ResourceID

func (k KVWritePort) ResourceID() string

ResourceID returns unique identifier for KV write ports

func (KVWritePort) Type

func (k KVWritePort) Type() string

Type returns the port type identifier

type LifecycleComponent

type LifecycleComponent interface {
	Discoverable
	Initialize() error
	Start(ctx context.Context) error
	Stop(timeout time.Duration) error
}

LifecycleComponent defines components that support full lifecycle management following the unified Pattern A:

  • Initialize() error // Setup/create only, NO context
  • Start(ctx context.Context) error // Start with context passed through
  • Stop(timeout time.Duration) error // Stop with timeout for graceful shutdown

func AsLifecycleComponent

func AsLifecycleComponent(comp Discoverable) (LifecycleComponent, bool)

AsLifecycleComponent safely casts a component to LifecycleComponent

type LifecycleFactory

type LifecycleFactory func() LifecycleComponent

LifecycleFactory creates a new instance of a LifecycleComponent for testing

type LifecycleReporter

type LifecycleReporter interface {
	// ReportStage updates the component's current processing stage
	ReportStage(ctx context.Context, stage string) error

	// ReportCycleStart marks the beginning of a new processing cycle
	ReportCycleStart(ctx context.Context) error

	// ReportCycleComplete marks successful cycle completion
	ReportCycleComplete(ctx context.Context) error

	// ReportCycleError marks cycle failure with error details
	ReportCycleError(ctx context.Context, err error) error
}

LifecycleReporter allows components to report their current processing stage. This enables observability for long-running async components (ADR-003).

func NewLifecycleReporterFromConfig

func NewLifecycleReporterFromConfig(cfg LifecycleReporterConfig) LifecycleReporter

NewLifecycleReporterFromConfig creates a lifecycle reporter based on configuration. Returns either a throttled KV reporter, plain KV reporter, or no-op reporter.

type LifecycleReporterConfig

type LifecycleReporterConfig struct {
	// KV is the NATS KV bucket for status storage. Required for KV reporter.
	KV jetstream.KeyValue

	// ComponentName is the name of the component. Required.
	ComponentName string

	// Logger for the reporter. Optional.
	Logger *slog.Logger

	// EnableThrottling enables throttled writes. Default: true.
	EnableThrottling bool

	// ThrottleInterval is the minimum interval between writes.
	// Only used if EnableThrottling is true. Default: 1 second.
	ThrottleInterval time.Duration

	// Disabled returns a no-op reporter if true.
	Disabled bool
}

LifecycleReporterConfig holds configuration for creating lifecycle reporters.

func DefaultLifecycleReporterConfig

func DefaultLifecycleReporterConfig(componentName string) LifecycleReporterConfig

DefaultLifecycleReporterConfig returns configuration with sensible defaults. EnableThrottling is true with 1 second interval.

type LogEntry

type LogEntry struct {
	Timestamp string   `json:"timestamp"` // RFC3339 format
	Level     LogLevel `json:"level"`
	Component string   `json:"component"`
	FlowID    string   `json:"flow_id"`
	Message   string   `json:"message"`
	Stack     string   `json:"stack,omitempty"` // Stack trace for errors
}

LogEntry represents a structured log entry that can be published to NATS and consumed by the Flow Builder SSE endpoint.

type LogLevel

type LogLevel string

LogLevel represents the severity level of a log entry

const (
	// LogLevelDebug represents debug-level logs
	LogLevelDebug LogLevel = "DEBUG"
	// LogLevelInfo represents informational logs
	LogLevelInfo LogLevel = "INFO"
	// LogLevelWarn represents warning logs
	LogLevelWarn LogLevel = "WARN"
	// LogLevelError represents error logs
	LogLevelError LogLevel = "ERROR"
)

type ManagedComponent

type ManagedComponent struct {
	// Component is the actual component instance
	Component Discoverable

	// State tracks the current lifecycle state
	State State

	// Named Context Management for Individual Component Lifecycle Control
	//
	// These fields store named child contexts to enable individual component cancellation
	// during shutdown. This follows the pattern where ComponentManager creates a child
	// context for each component and passes it to lifecycle.Start(ctx).
	//
	// The component itself NEVER stores the context - it receives it as a parameter
	// following proper Go idioms. Only the ComponentManager stores these contexts
	// to coordinate orderly shutdown and individual component cancellation.
	//
	// Pattern:
	//   1. ComponentManager creates: ctx, cancel := context.WithCancel(parentCtx)
	//   2. ComponentManager stores: mc.Context = ctx, mc.Cancel = cancel
	//   3. ComponentManager calls: lifecycle.Start(mc.Context)
	//   4. Component uses context as parameter (proper Go idiom)
	//   5. ComponentManager can cancel individual components: mc.Cancel()
	Context context.Context    // Named child context for this specific component
	Cancel  context.CancelFunc // Named cancellation for this specific component

	// StartOrder tracks the order components were started for reverse shutdown
	StartOrder int

	// LastError tracks the last error that occurred during lifecycle operations
	LastError error
}

ManagedComponent tracks a component and its lifecycle state This is used by ComponentManager to properly manage component lifecycle

type Metadata

type Metadata struct {
	Name        string `json:"name"`
	Type        string `json:"type"` // "input", "processor", "output", "storage"
	Description string `json:"description"`
	Version     string `json:"version"`
}

Metadata describes what a component is

type NATSPort

type NATSPort struct {
	Subject   string             `json:"subject"`
	Queue     string             `json:"queue,omitempty"`
	Interface *InterfaceContract `json:"interface,omitempty"`
}

NATSPort - NATS pub/sub

func (NATSPort) IsExclusive

func (n NATSPort) IsExclusive() bool

IsExclusive returns false as multiple components can subscribe

func (NATSPort) ResourceID

func (n NATSPort) ResourceID() string

ResourceID returns unique identifier for NATS ports

func (NATSPort) Type

func (n NATSPort) Type() string

Type returns the port type identifier

type NATSRequestPort

type NATSRequestPort struct {
	Subject   string             `json:"subject"`
	Timeout   string             `json:"timeout,omitempty"` // Duration string e.g. "1s", "500ms"
	Retries   int                `json:"retries,omitempty"`
	Interface *InterfaceContract `json:"interface,omitempty"`
}

NATSRequestPort - NATS Request/Response pattern for synchronous operations

func (NATSRequestPort) IsExclusive

func (n NATSRequestPort) IsExclusive() bool

IsExclusive returns false as multiple components can handle requests

func (NATSRequestPort) ResourceID

func (n NATSRequestPort) ResourceID() string

ResourceID returns unique identifier for NATS request ports

func (NATSRequestPort) Type

func (n NATSRequestPort) Type() string

Type returns the port type identifier

type NATSRequestPortConfig

type NATSRequestPortConfig struct {
	Subject string `json:"subject"`
	Timeout string `json:"timeout,omitempty"`
}

NATSRequestPortConfig represents a NATS request/reply port configuration Type alias for NATSRequestPort for test compatibility

func (NATSRequestPortConfig) IsExclusive

func (n NATSRequestPortConfig) IsExclusive() bool

IsExclusive returns false as multiple components can handle requests

func (NATSRequestPortConfig) ResourceID

func (n NATSRequestPortConfig) ResourceID() string

ResourceID returns unique identifier for NATS request ports

func (NATSRequestPortConfig) Type

func (n NATSRequestPortConfig) Type() string

Type returns the port type identifier

type NATSStreamPortConfig

type NATSStreamPortConfig struct {
	Subject  string `json:"subject"`
	Consumer string `json:"consumer,omitempty"`
}

NATSStreamPortConfig represents a NATS streaming port configuration Used for stream-based message delivery patterns

func (NATSStreamPortConfig) IsExclusive

func (n NATSStreamPortConfig) IsExclusive() bool

IsExclusive returns false as multiple components can subscribe

func (NATSStreamPortConfig) ResourceID

func (n NATSStreamPortConfig) ResourceID() string

ResourceID returns unique identifier for NATS stream ports

func (NATSStreamPortConfig) Type

func (n NATSStreamPortConfig) Type() string

Type returns the port type identifier

type NetworkPort

type NetworkPort struct {
	Protocol string `json:"protocol"` // "tcp", "udp"
	Host     string `json:"host"`     // "0.0.0.0", "localhost"
	Port     int    `json:"port"`     // 14550, 8080
}

NetworkPort - TCP/UDP network bindings

func (NetworkPort) IsExclusive

func (n NetworkPort) IsExclusive() bool

IsExclusive returns true as network ports are exclusive

func (NetworkPort) ResourceID

func (n NetworkPort) ResourceID() string

ResourceID returns unique identifier for network ports

func (NetworkPort) Type

func (n NetworkPort) Type() string

Type returns the port type identifier

type NoOpLifecycleReporter

type NoOpLifecycleReporter struct{}

NoOpLifecycleReporter is a no-op implementation for when lifecycle reporting is disabled.

func NewNoOpLifecycleReporter

func NewNoOpLifecycleReporter() *NoOpLifecycleReporter

NewNoOpLifecycleReporter creates a no-op lifecycle reporter.

func (*NoOpLifecycleReporter) ReportCycleComplete

func (r *NoOpLifecycleReporter) ReportCycleComplete(_ context.Context) error

ReportCycleComplete is a no-op.

func (*NoOpLifecycleReporter) ReportCycleError

func (r *NoOpLifecycleReporter) ReportCycleError(_ context.Context, _ error) error

ReportCycleError is a no-op.

func (*NoOpLifecycleReporter) ReportCycleStart

func (r *NoOpLifecycleReporter) ReportCycleStart(_ context.Context) error

ReportCycleStart is a no-op.

func (*NoOpLifecycleReporter) ReportStage

func (r *NoOpLifecycleReporter) ReportStage(_ context.Context, _ string) error

ReportStage is a no-op.

type PayloadBuilder

type PayloadBuilder func(fields map[string]any) (any, error)

PayloadBuilder creates a typed payload from field mappings. Used by workflow variable interpolation to construct typed payloads from step output maps. Returns error if required fields are missing or field values cannot be converted to the target type. Returns any to avoid import cycles - the actual payload should implement message.Payload.

OPTIONAL: If not provided, BuildPayload falls back to JSON marshal/unmarshal using the Factory to create the target type. Custom builders are only needed for performance optimization of high-frequency payload types.

type PayloadFactory

type PayloadFactory func() any

PayloadFactory creates a payload instance for a specific message type. The factory returns an any to avoid import cycles. The actual payload should implement the message.Payload interface.

type PayloadRegistration

type PayloadRegistration struct {
	Factory     PayloadFactory `json:"-"`           // Factory function (not serializable)
	Builder     PayloadBuilder `json:"-"`           // Builder function (not serializable)
	Domain      string         `json:"domain"`      // Message domain (e.g., "robotics", "sensors")
	Category    string         `json:"category"`    // Message category (e.g., "heartbeat", "gps")
	Version     string         `json:"version"`     // Schema version (e.g., "v1", "v2")
	Description string         `json:"description"` // Human-readable description
	Example     map[string]any `json:"example"`     // Optional example payload data
}

PayloadRegistration holds factory and metadata for a payload type. This follows the same pattern as component Registration but is specific to message payload types.

func (*PayloadRegistration) MessageType

func (pr *PayloadRegistration) MessageType() string

MessageType returns the formatted message type string for this registration. Format: "domain.category.version" (e.g., "robotics.heartbeat.v1")

type PayloadRegistry

type PayloadRegistry struct {
	// contains filtered or unexported fields
}

PayloadRegistry manages payload factories for message deserialization. It provides thread-safe registration and lookup of payload factories, enabling BaseMessage.UnmarshalJSON to recreate typed payloads from JSON.

The registry follows the same patterns as the component Registry but is specifically designed for message payload types.

func GlobalPayloadRegistry

func GlobalPayloadRegistry() *PayloadRegistry

GlobalPayloadRegistry returns the global payload registry. This is useful for introspection, such as listing all registered payloads.

func NewPayloadRegistry

func NewPayloadRegistry() *PayloadRegistry

NewPayloadRegistry creates a new empty payload registry.

func (*PayloadRegistry) BuildPayload

func (pr *PayloadRegistry) BuildPayload(domain, category, version string, fields map[string]any) (any, error)

BuildPayload creates a typed payload from field mappings. If a custom Builder is registered, it is used for efficient field mapping. Otherwise, falls back to JSON marshal/unmarshal using the Factory.

Returns an error if the message type is not registered or if building fails. This is used by workflow variable interpolation to construct typed payloads from step output maps. Returns any to avoid import cycles - the actual payload implements message.Payload.

func (*PayloadRegistry) CreatePayload

func (pr *PayloadRegistry) CreatePayload(domain, category, version string) any

CreatePayload creates a payload instance using the registered factory. Returns nil if the message type is not registered. This allows BaseMessage.UnmarshalJSON to handle unknown types gracefully by falling back to GenericPayload.

func (*PayloadRegistry) GetRegistration

func (pr *PayloadRegistry) GetRegistration(msgType string) (*PayloadRegistration, bool)

GetRegistration returns the payload registration for a specific message type. Returns the registration and true if found, nil and false otherwise.

func (*PayloadRegistry) ListByDomain

func (pr *PayloadRegistry) ListByDomain(domain string) []*PayloadRegistration

ListByDomain returns all payload registrations for a specific domain. This is useful for discovering what message types are available within a particular domain (e.g., "robotics", "sensors").

func (*PayloadRegistry) ListPayloads

func (pr *PayloadRegistry) ListPayloads() map[string]*PayloadRegistration

ListPayloads returns all registered payload types. Returns a copy of the registrations map to prevent external modification.

func (*PayloadRegistry) RegisterPayload

func (pr *PayloadRegistry) RegisterPayload(registration *PayloadRegistration) error

RegisterPayload registers a payload factory with validation. The message type is derived from the registration's Domain, Category, and Version fields. Returns an error if validation fails or the type is already registered.

type PlatformMeta

type PlatformMeta = types.PlatformMeta

PlatformMeta provides platform identity to components. Type alias to avoid import cycles while maintaining compatibility.

type Port

type Port struct {
	Name        string    `json:"name"`
	Direction   Direction `json:"direction"`
	Required    bool      `json:"required"`
	Description string    `json:"description"`
	Config      Portable  `json:"config"`
}

Port describes any I/O interface

func BuildPortFromDefinition

func BuildPortFromDefinition(def PortDefinition, direction Direction) Port

BuildPortFromDefinition creates a Port from a PortDefinition

func MergePortConfigs

func MergePortConfigs(defaults []Port, overrides []PortDefinition, direction Direction) []Port

MergePortConfigs merges default ports with configured overrides

func (Port) MarshalJSON

func (p Port) MarshalJSON() ([]byte, error)

MarshalJSON provides custom JSON marshaling for Port struct This handles the Portable interface by creating a wrapper with type information

func (*Port) UnmarshalJSON

func (p *Port) UnmarshalJSON(data []byte) error

UnmarshalJSON provides custom JSON unmarshaling for Port struct This handles reconstruction of the Portable interface from JSON

type PortCapability

type PortCapability struct {
	Name        string `json:"name"`
	Subject     string `json:"subject"`
	Type        string `json:"type"`
	Interface   string `json:"interface,omitempty"`
	Description string `json:"description,omitempty"`
}

PortCapability describes an input or output port for discovery.

type PortConfig

type PortConfig struct {
	Inputs  []PortDefinition `json:"inputs,omitempty"`
	Outputs []PortDefinition `json:"outputs,omitempty"`
	KVWrite []PortDefinition `json:"kv_write,omitempty"`
}

PortConfig represents port configuration in component config

type PortDefinition

type PortDefinition struct {
	Name        string `json:"name"                  schema:"readonly,type:string,description:Port identifier"`
	Type        string `json:"type,omitempty"        schema:"readonly,type:string,description:Port type (nats jetstream kv-watch etc)"`
	Subject     string `json:"subject,omitempty"     schema:"editable,type:string,description:NATS subject pattern or network address"`
	Interface   string `json:"interface,omitempty"   schema:"readonly,type:string,description:Interface contract type"`
	Required    bool   `json:"required,omitempty"    schema:"readonly,type:bool,description:Whether port connection is required"`
	Description string `json:"description,omitempty" schema:"readonly,type:string,description:Human-readable port description"`
	Timeout     string `json:"timeout,omitempty"     schema:"editable,type:string,description:Request timeout for request/reply ports"`
	StreamName  string `json:"stream_name,omitempty" schema:"editable,type:string,description:JetStream stream name"`
	Bucket      string `json:"bucket,omitempty"      schema:"editable,type:string,description:KV bucket name for KV ports"`

	// Config holds type-specific port configuration (e.g., JetStreamPort for consumer settings)
	Config any `json:"config,omitempty" schema:"editable,type:object,description:Type-specific port configuration"`
}

PortDefinition represents a port configuration from JSON

type PortFieldInfo

type PortFieldInfo struct {
	Type     string `json:"type"`
	Editable bool   `json:"editable"`
}

PortFieldInfo describes metadata for PortDefinition fields

type Portable

type Portable interface {
	ResourceID() string // Unique identifier for conflict detection
	IsExclusive() bool  // Whether multiple components can share
	Type() string       // Port type identifier
}

Portable interface - minimal, no Get prefix (Go idiomatic)

type ProcessorMetrics

type ProcessorMetrics struct {
	// EventsProcessed counts total events processed, labeled by operation type
	EventsProcessed *prometheus.CounterVec

	// EventsErrors counts total processing errors, labeled by error type
	EventsErrors *prometheus.CounterVec

	// KVOperations counts KV bucket operations, labeled by operation (get/put/delete/watch)
	KVOperations *prometheus.CounterVec

	// ProcessingDuration measures processing latency in seconds
	ProcessingDuration prometheus.Histogram
	// contains filtered or unexported fields
}

ProcessorMetrics provides standard Prometheus metrics for processor components. Each processor component should create its own instance with a unique subsystem name.

func NewProcessorMetrics

func NewProcessorMetrics(registry *metric.MetricsRegistry, subsystem string) *ProcessorMetrics

NewProcessorMetrics creates and registers processor metrics with the given subsystem name. The subsystem name should be the component name with underscores (e.g., "graph_ingest"). If registry is nil, metrics are created but not registered (useful for testing).

func (*ProcessorMetrics) ObserveDuration

func (m *ProcessorMetrics) ObserveDuration(seconds float64)

ObserveDuration records a processing duration

func (*ProcessorMetrics) RecordError

func (m *ProcessorMetrics) RecordError(errorType string)

RecordError increments the error counter for the given error type

func (*ProcessorMetrics) RecordEvent

func (m *ProcessorMetrics) RecordEvent(operation string)

RecordEvent increments the events processed counter for the given operation

func (*ProcessorMetrics) RecordKVOperation

func (m *ProcessorMetrics) RecordKVOperation(operation string)

RecordKVOperation increments the KV operations counter

type PropertySchema

type PropertySchema struct {
	Type        string                    `json:"type"` // "string", "int", "bool", "float", "enum", "array", "object", "ports", "cache"
	Description string                    `json:"description"`
	Default     any                       `json:"default,omitempty"`
	Enum        []string                  `json:"enum,omitempty"`        // Valid string values
	Minimum     *int                      `json:"minimum,omitempty"`     // For numeric types
	Maximum     *int                      `json:"maximum,omitempty"`     // For numeric types
	Category    string                    `json:"category,omitempty"`    // "basic" or "advanced" for UI organization
	PortFields  map[string]PortFieldInfo  `json:"portFields,omitempty"`  // Metadata for port fields (when type is "ports")
	CacheFields map[string]CacheFieldInfo `json:"cacheFields,omitempty"` // Metadata for cache fields (when type is "cache")
	Properties  map[string]PropertySchema `json:"properties,omitempty"`  // Nested properties for object types
	Required    []string                  `json:"required,omitempty"`    // Required nested fields for object types
	Items       *PropertySchema           `json:"items,omitempty"`       // Item schema for array types
}

PropertySchema describes a single configuration property

type Registerable

type Registerable interface {
	Registration() Registration
}

Registerable allows components to self-describe for registry registration

type Registration

type Registration struct {
	Name         string       `json:"name"`         // Factory name (e.g., "udp-input")
	Type         string       `json:"type"`         // Component type (input/processor/output/storage)
	Protocol     string       `json:"protocol"`     // Technical protocol (udp, mavlink, websocket, etc.)
	Domain       string       `json:"domain"`       // Business domain (robotics, semantic, network, storage)
	Description  string       `json:"description"`  // Human-readable description
	Version      string       `json:"version"`      // Component version
	Schema       ConfigSchema `json:"schema"`       // Schema as static metadata (Feature 011)
	Factory      Factory      `json:"-"`            // Factory function (not serializable)
	Dependencies []string     `json:"dependencies"` // Optional: other required components
}

Registration holds factory and metadata for a component type

type RegistrationConfig

type RegistrationConfig struct {
	Name        string       // Component name (e.g., "udp", "websocket", "graph-processor")
	Factory     Factory      // Factory function to create component instances
	Schema      ConfigSchema // Configuration schema for validation and discovery
	Type        string       // Component type: "input", "processor", "output", "storage"
	Protocol    string       // Technical protocol (udp, tcp, websocket, file, etc.)
	Domain      string       // Business domain (network, storage, processing, robotics, semantic)
	Description string       // Human-readable description of the component
	Version     string       // Component version (semver recommended)
}

RegistrationConfig provides a clean API for component registration. This config struct replaces the previous 7-8 parameter function signatures. It maps 1:1 to Registration struct fields for simplicity.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry manages component factories and instances It provides thread-safe registration and lookup of both factories (for creation) and instances (for discovery and management).

func NewRegistry

func NewRegistry(opts ...func(*Registry)) *Registry

NewRegistry creates a new empty component registry Optionally accepts a logger; defaults to slog.Default() if none provided. This maintains backwards compatibility with existing callers.

func (*Registry) Component

func (r *Registry) Component(name string) Discoverable

Component retrieves a specific component instance by name Returns nil if the component is not found.

func (*Registry) CreateComponent

func (r *Registry) CreateComponent(
	instanceName string, config types.ComponentConfig, deps Dependencies,
) (Discoverable, error)

CreateComponent creates a component instance using the specified factory The componentType parameter specifies which factory to use, instanceName gives the created component a unique identifier, and config provides all component configuration including dependencies. CreateComponent creates and registers a new component instance. The instanceName parameter is the unique identifier for this instance (e.g., "udp-sensor-main"). The config contains the factory name, type, and component-specific configuration. Factory functions don't do I/O, so no context is needed.

func (*Registry) CreatePayload

func (r *Registry) CreatePayload(domain, category, version string) any

CreatePayload creates a payload instance using the registered factory. Returns nil if the message type is not registered.

func (*Registry) GetCapabilities

func (r *Registry) GetCapabilities(subjectPattern string) []*CapabilityAnnouncement

GetCapabilities returns capabilities matching the subject pattern. Supports NATS wildcards: "*" matches one token, ">" matches one or more tokens at end. Returns empty slice (not nil) when no matches or cache empty. Thread-safe for concurrent access.

func (*Registry) GetComponent

func (r *Registry) GetComponent(name string) (Discoverable, error)

GetComponent retrieves a component instance by factory type name (for schema retrieval) DEPRECATED: Use GetComponentSchema() instead for schema retrieval. This method creates a temporary component instance, which fails for components with dependency validation.

func (*Registry) GetComponentSchema

func (r *Registry) GetComponentSchema(name string) (ConfigSchema, error)

GetComponentSchema retrieves a component's schema directly from Registration metadata This method retrieves schemas without component instantiation (Feature 011 - Option 1) Schema is stored as static metadata during registration, avoiding dependency validation issues

func (*Registry) GetFactory

func (r *Registry) GetFactory(name string) (Factory, bool)

GetFactory returns a specific factory by name Unlike ListFactories, this returns the actual Factory function for creating components

func (*Registry) InitNATS

func (r *Registry) InitNATS(ctx context.Context, client *natsclient.Client, nodeID string) error

InitNATS initializes NATS JetStream capability discovery using natsclient.Client. Creates the COMPONENT_CAPABILITIES stream if it doesn't exist.

func (*Registry) ListAvailable

func (r *Registry) ListAvailable() map[string]Info

ListAvailable returns information about all available component types This provides metadata about what types of components can be created.

func (*Registry) ListComponentTypes

func (r *Registry) ListComponentTypes() []string

ListComponentTypes returns all registered component factory type names This returns factory names (e.g., "udp-input", "websocket-output") not instance names

func (*Registry) ListComponents

func (r *Registry) ListComponents() map[string]Discoverable

ListComponents returns all registered component instances This is used by the discovery service to provide information about currently running components.

func (*Registry) ListFactories

func (r *Registry) ListFactories() map[string]*Registration

ListFactories returns all registered component factories This provides information about what types of components can be created.

func (*Registry) ListPayloads

func (r *Registry) ListPayloads() map[string]*PayloadRegistration

ListPayloads returns all registered payload types.

func (*Registry) RegisterFactory

func (r *Registry) RegisterFactory(name string, registration *Registration) error

RegisterFactory registers a component factory with the given name Returns an error if a factory with the same name is already registered.

func (*Registry) RegisterInstance

func (r *Registry) RegisterInstance(name string, component Discoverable) error

RegisterInstance registers a component instance with the given name This allows the instance to be discovered and managed. Returns an error if an instance with the same name is already registered.

func (*Registry) RegisterPayload

func (r *Registry) RegisterPayload(registration *PayloadRegistration) error

RegisterPayload registers a payload factory with the registry. This allows typed payloads to be recreated during message deserialization.

func (*Registry) RegisterWithConfig

func (r *Registry) RegisterWithConfig(config RegistrationConfig) error

RegisterWithConfig registers a component using a configuration struct. This is the recommended registration method that replaces the multi-parameter functions.

Example usage:

registry.RegisterWithConfig(component.RegistrationConfig{
    Name:        "udp",
    Factory:     CreateUDPInput,
    Schema:      udpSchema,
    Type:        "input",
    Protocol:    "udp",
    Domain:      "network",
    Description: "UDP input component for receiving network data",
    Version:     "1.0.0",
})

func (*Registry) StartHeartbeat

func (r *Registry) StartHeartbeat(ctx context.Context, interval time.Duration)

StartHeartbeat starts periodic republishing of all component capabilities.

func (*Registry) StopHeartbeat

func (r *Registry) StopHeartbeat()

StopHeartbeat stops the heartbeat goroutine.

func (*Registry) SubscribeCapabilities

func (r *Registry) SubscribeCapabilities(ctx context.Context, patterns ...string) error

SubscribeCapabilities subscribes to capability announcements from NATS. If no patterns provided, defaults to "*.capabilities.*" (all components).

func (*Registry) UnregisterInstance

func (r *Registry) UnregisterInstance(name string)

UnregisterInstance removes a component instance from the registry This is typically called when a component is stopped or destroyed.

func (*Registry) WaitForCapabilities

func (r *Registry) WaitForCapabilities(ctx context.Context, pattern string, minCount int, timeout time.Duration) error

WaitForCapabilities waits until minimum capabilities matching pattern are discovered. Returns immediately if len(GetCapabilities(pattern)) >= minCount. Returns ctx.Err() on context cancellation. Returns nil on timeout (proceed anyway - NOT an error per plan). Polls every 100ms.

type SchemaDirectives

type SchemaDirectives struct {
	// core (required)
	Type        string // REQUIRED - field type
	Description string // REQUIRED (warning if missing)

	// UI Organization
	Category string // "basic" or "advanced"
	ReadOnly bool   // For PortDefinition fields
	Editable bool   // For PortDefinition fields
	Hidden   bool   // Hide from UI

	// Constraints
	Default  any      // Type-specific default value (stored as string, converted during schema generation)
	Required bool     // Field must be provided
	Min      *int     // Numeric minimum
	Max      *int     // Numeric maximum
	Enum     []string // Valid enum values

	// Future extensions (stored but not used yet)
	Help        string
	Placeholder string
	Pattern     string
	Format      string
}

SchemaDirectives represents parsed schema tag directives

func ParseSchemaTag

func ParseSchemaTag(tag string) (SchemaDirectives, error)

ParseSchemaTag parses a schema struct tag into directives.

Tag Syntax:

  • Directives are comma-separated
  • Key-value pairs use colon: "key:value"
  • Boolean flags have no colon: "readonly", "required"
  • Enum values are pipe-separated: "enum:val1|val2|val3"
  • Whitespace is trimmed from all values

Required Directives:

  • type: Field data type (string, int, bool, float, enum, array, object, ports)

Recommended Directives:

  • description: Human-readable field description (used for UI and documentation)

Optional Directives:

  • category: UI organization (basic, advanced)
  • default: Default value (converted to appropriate type)
  • min/max: Numeric constraints
  • enum: Valid values for enum types (pipe-separated)
  • readonly: Field is read-only (boolean flag)
  • editable: Field is user-editable (boolean flag)
  • hidden: Field is hidden from UI (boolean flag)
  • required: Field must be provided (boolean flag)

Example Tags:

schema:"type:string,description:Component name,category:basic"
schema:"type:int,description:Port,min:1,max:65535,default:8080"
schema:"type:enum,description:Level,enum:debug|info|warn,default:info"
schema:"required,type:string,description:API key"
schema:"readonly,type:string,description:System ID"

Returns an error if:

  • Tag is empty
  • Type directive is missing
  • Type value is invalid
  • Directive syntax is malformed
  • Numeric values cannot be parsed

See SCHEMA_TAG_SPEC.md for complete specification.

Example

ExampleParseSchemaTag demonstrates parsing individual schema tags

package main

import (
	"fmt"

	"github.com/c360studio/semstreams/component"
)

func main() {
	// Parse a simple field tag
	tag := "type:int,description:Port number,min:1,max:65535,default:8080"
	directives, err := component.ParseSchemaTag(tag)
	if err != nil {
		fmt.Printf("Error: %v\n", err)
		return
	}

	fmt.Printf("Type: %s\n", directives.Type)
	fmt.Printf("Description: %s\n", directives.Description)
	fmt.Printf("Min: %d\n", *directives.Min)
	fmt.Printf("Max: %d\n", *directives.Max)
	fmt.Printf("Default: %s\n", directives.Default)

}
Output:

Type: int
Description: Port number
Min: 1
Max: 65535
Default: 8080
Example (Enum)

ExampleParseSchemaTag_enum demonstrates parsing enum tags

package main

import (
	"fmt"

	"github.com/c360studio/semstreams/component"
)

func main() {
	tag := "type:enum,description:Log level,enum:debug|info|warn|error,default:info"
	directives, _ := component.ParseSchemaTag(tag)

	fmt.Printf("Type: %s\n", directives.Type)
	fmt.Printf("Description: %s\n", directives.Description)
	fmt.Printf("Enum values: %v\n", directives.Enum)
	fmt.Printf("Default: %s\n", directives.Default)

}
Output:

Type: enum
Description: Log level
Enum values: [debug info warn error]
Default: info
Example (Flags)

ExampleParseSchemaTag_flags demonstrates boolean flags

package main

import (
	"fmt"

	"github.com/c360studio/semstreams/component"
)

func main() {
	tag := "required,readonly,type:string,description:System identifier"
	directives, _ := component.ParseSchemaTag(tag)

	fmt.Printf("Type: %s\n", directives.Type)
	fmt.Printf("Required: %v\n", directives.Required)
	fmt.Printf("ReadOnly: %v\n", directives.ReadOnly)

}
Output:

Type: string
Required: true
ReadOnly: true

type SimpleMockComponent

type SimpleMockComponent struct {
	// contains filtered or unexported fields
}

SimpleMockComponent is a simple component implementation for testing

func (*SimpleMockComponent) ConfigSchema

func (m *SimpleMockComponent) ConfigSchema() ConfigSchema

ConfigSchema returns the component's configuration schema

func (*SimpleMockComponent) DataFlow

func (m *SimpleMockComponent) DataFlow() FlowMetrics

DataFlow returns the component's data flow metrics (zeros for mock)

func (*SimpleMockComponent) Health

func (m *SimpleMockComponent) Health() HealthStatus

Health returns the component's health status (always healthy for mock)

func (*SimpleMockComponent) InputPorts

func (m *SimpleMockComponent) InputPorts() []Port

InputPorts returns the component's input ports (none for mock)

func (*SimpleMockComponent) Meta

func (m *SimpleMockComponent) Meta() Metadata

Meta returns the component metadata

func (*SimpleMockComponent) OutputPorts

func (m *SimpleMockComponent) OutputPorts() []Port

OutputPorts returns the component's output ports (none for mock)

type State

type State int

State represents the current lifecycle state of a component

const (
	// StateCreated indicates component was created but not initialized
	StateCreated State = iota
	// StateInitialized indicates component was initialized but not started
	StateInitialized
	// StateStarted indicates component is running
	StateStarted
	// StateStopped indicates component was stopped
	StateStopped
	// StateFailed indicates component failed during lifecycle operation
	StateFailed
)

func (State) String

func (cs State) String() string

String returns a string representation of the component state

type Status

type Status struct {
	Component       string    `json:"component"`
	Stage           string    `json:"stage"`
	CycleID         string    `json:"cycle_id,omitempty"`
	CycleStartedAt  time.Time `json:"cycle_started_at,omitempty"`
	StageStartedAt  time.Time `json:"stage_started_at"`
	LastCompletedAt time.Time `json:"last_completed_at,omitempty"`
	LastResult      string    `json:"last_result,omitempty"` // "success" or "error"
	LastError       string    `json:"last_error,omitempty"`
}

Status represents the current processing state of a component. This is used by ADR-003 lifecycle status pattern for async component observability.

type ThrottledLifecycleReporter

type ThrottledLifecycleReporter struct {
	// contains filtered or unexported fields
}

ThrottledLifecycleReporter wraps any LifecycleReporter to enforce minimum intervals between KV writes. Important events (CycleComplete, CycleError) are never throttled.

func NewThrottledLifecycleReporter

func NewThrottledLifecycleReporter(delegate LifecycleReporter, minInterval time.Duration, logger *slog.Logger) *ThrottledLifecycleReporter

NewThrottledLifecycleReporter creates a throttled wrapper around any LifecycleReporter. minInterval is the minimum time between writes (default 1 second if zero).

func (*ThrottledLifecycleReporter) FlushPending

func (r *ThrottledLifecycleReporter) FlushPending(ctx context.Context) error

FlushPending writes any pending stage to storage immediately. Call this before component shutdown or when immediate status update is needed.

func (*ThrottledLifecycleReporter) ReportCycleComplete

func (r *ThrottledLifecycleReporter) ReportCycleComplete(ctx context.Context) error

ReportCycleComplete marks successful cycle completion. NEVER throttled - always written immediately. Also flushes any pending stage.

func (*ThrottledLifecycleReporter) ReportCycleError

func (r *ThrottledLifecycleReporter) ReportCycleError(ctx context.Context, err error) error

ReportCycleError marks cycle failure with error details. NEVER throttled - always written immediately. Also flushes any pending stage.

func (*ThrottledLifecycleReporter) ReportCycleStart

func (r *ThrottledLifecycleReporter) ReportCycleStart(ctx context.Context) error

ReportCycleStart marks the beginning of a new processing cycle. Throttled: queued if within minInterval window.

func (*ThrottledLifecycleReporter) ReportStage

func (r *ThrottledLifecycleReporter) ReportStage(ctx context.Context, stage string) error

ReportStage updates the component's current processing stage. Throttled: if within minInterval, queues stage for next write window.

type TimerPort

type TimerPort struct {
	Interval  string             `json:"interval"` // Duration string e.g. "30s", "1m"
	Interface *InterfaceContract `json:"interface,omitempty"`
}

TimerPort represents a periodic timer trigger port

func (TimerPort) IsExclusive

func (t TimerPort) IsExclusive() bool

IsExclusive returns false as multiple timers can run independently

func (TimerPort) ResourceID

func (t TimerPort) ResourceID() string

ResourceID returns unique identifier for timer ports

func (TimerPort) Type

func (t TimerPort) Type() string

Type returns the port type identifier

type Validatable

type Validatable interface {
	Validate() error
}

Validatable interface for configs that can self-validate

type ValidationError

type ValidationError struct {
	Field   string `json:"field"`   // Name of the field that failed validation
	Message string `json:"message"` // Human-readable error message
	Code    string `json:"code"`    // Machine-readable error code (see above)
}

ValidationError represents a validation error for a specific configuration field. It provides structured error information that can be displayed to users and mapped to specific form fields in the UI.

Error codes are standardized across frontend and backend:

  • "required": Field is required but missing
  • "min": Numeric value below minimum threshold
  • "max": Numeric value above maximum threshold
  • "enum": Value not in allowed enum values
  • "type": Value doesn't match expected type (string, int, bool, etc.)
  • "pattern": String doesn't match required pattern (future use)

func ValidateConfig

func ValidateConfig(config map[string]any, schema ConfigSchema) []ValidationError

ValidateConfig validates a configuration map against a ConfigSchema. It checks required fields, type constraints, min/max bounds, and enum values.

The validation is lenient - unknown fields are allowed to support backward compatibility and future schema evolution. Only explicitly defined properties are validated against their schema constraints.

Returns a slice of ValidationError containing all validation failures found. An empty slice indicates the configuration is valid.

Example usage:

schema := component.ConfigSchema{
    Properties: map[string]component.PropertySchema{
        "port": {
            Type:     "int",
            Minimum:  ptrInt(1),
            Maximum:  ptrInt(65535),
            Category: "basic",
        },
    },
    Required: []string{"port"},
}

config := map[string]any{"port": 99999}
errors := component.ValidateConfig(config, schema)
if len(errors) > 0 {
    // Handle validation errors
    fmt.Printf("Validation failed: %s\n", errors[0].Message)
}

Directories

Path Synopsis
Package flowgraph provides flow graph analysis and validation for component connections.
Package flowgraph provides flow graph analysis and validation for component connections.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL