command

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2026 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package command defines the Command domain entity for server-controlled agents.

Index

Constants

View Source
const (
	// AuthTokenLength is the length of generated auth tokens (32 bytes = 64 hex chars)
	AuthTokenLength = 32

	// AuthTokenPrefix is the prefix for auth tokens
	AuthTokenPrefix = "oc-cmd-"

	// DefaultAuthTokenTTL is the default time-to-live for auth tokens (24 hours)
	DefaultAuthTokenTTL = 24 * time.Hour
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Command

type Command struct {
	ID       shared.ID
	TenantID shared.ID
	AgentID  *shared.ID // Target agent (nil = any agent can pick up)

	Type     CommandType
	Priority CommandPriority
	Payload  json.RawMessage

	Status       CommandStatus
	ErrorMessage string

	// Timing
	CreatedAt      time.Time
	ExpiresAt      *time.Time
	AcknowledgedAt *time.Time
	StartedAt      *time.Time
	CompletedAt    *time.Time

	// Result
	Result json.RawMessage

	// Scheduling
	ScheduledAt *time.Time
	ScheduleID  *shared.ID

	// Pipeline tracking
	StepRunID *shared.ID // Reference to pipeline step run (for progression tracking)

	// IsPlatformJob indicates this job runs on a platform agent (not tenant's own agent)
	IsPlatformJob bool

	// PlatformAgentID is the platform agent assigned to execute this job (auto-selected)
	PlatformAgentID *shared.ID

	// AuthTokenHash is the SHA256 hash of the auth token (for verification)
	AuthTokenHash string

	// AuthTokenPrefix is the first 8 characters of the token (for logging/debugging)
	AuthTokenPrefix string

	// AuthTokenExpiresAt is when the auth token expires (typically 24h after creation)
	AuthTokenExpiresAt *time.Time

	// QueuePriority is the calculated priority score (plan_base + age_bonus)
	// Higher value = processed first
	QueuePriority int

	// QueuedAt is when the job was added to the platform queue
	QueuedAt *time.Time

	// DispatchAttempts tracks how many times dispatch was attempted
	DispatchAttempts int

	// LastDispatchAt is the last time dispatch was attempted
	LastDispatchAt *time.Time
}

Command represents a command to be executed by an agent.

func NewCommand

func NewCommand(tenantID shared.ID, cmdType CommandType, priority CommandPriority, payload json.RawMessage) (*Command, error)

NewCommand creates a new Command entity.

func (*Command) Acknowledge

func (c *Command) Acknowledge()

Acknowledge marks the command as acknowledged.

func (*Command) AssignToPlatformAgent

func (c *Command) AssignToPlatformAgent(agentID shared.ID)

AssignToPlatformAgent assigns this job to a specific platform agent.

func (*Command) CanAcceptIngest

func (c *Command) CanAcceptIngest(agentID shared.ID, token string) bool

CanAcceptIngest checks if this command can accept ingest data from a platform agent. The command must be running, have a valid token, and match the agent.

func (*Command) CanBeAcknowledged

func (c *Command) CanBeAcknowledged() bool

CanBeAcknowledged checks if the command can be acknowledged.

func (*Command) CanRetry

func (c *Command) CanRetry(maxRetries int) bool

CanRetry checks if this job can be retried after failure.

func (*Command) Cancel

func (c *Command) Cancel()

Cancel marks the command as canceled.

func (*Command) ClearAuthToken

func (c *Command) ClearAuthToken()

ClearAuthToken clears the auth token (call after command completes).

func (*Command) Complete

func (c *Command) Complete(result json.RawMessage)

Complete marks the command as completed.

func (*Command) Expire

func (c *Command) Expire()

Expire marks the command as expired.

func (*Command) Fail

func (c *Command) Fail(errorMessage string)

Fail marks the command as failed.

func (*Command) GenerateAuthToken

func (c *Command) GenerateAuthToken(ttl time.Duration) (string, error)

GenerateAuthToken generates a new auth token for this command. Returns the raw token (to be sent to agent) and sets the hash on the command. The raw token should only be transmitted once and never stored.

func (*Command) IsAuthTokenValid

func (c *Command) IsAuthTokenValid() bool

IsAuthTokenValid checks if the auth token is still valid (not expired).

func (*Command) IsDispatchedToPlatformAgent

func (c *Command) IsDispatchedToPlatformAgent() bool

IsDispatchedToPlatformAgent checks if this job has been dispatched to a platform agent.

func (*Command) IsExpired

func (c *Command) IsExpired() bool

IsExpired checks if the command has expired.

func (*Command) IsPending

func (c *Command) IsPending() bool

IsPending checks if the command is pending.

func (*Command) IsQueued

func (c *Command) IsQueued() bool

IsQueued checks if this job is in the queue waiting for dispatch.

func (*Command) ReturnToQueue

func (c *Command) ReturnToQueue()

ReturnToQueue returns the job to the queue (e.g., if agent went offline).

func (*Command) SetAgentID

func (c *Command) SetAgentID(agentID shared.ID)

SetAgentID sets the target agent ID.

func (*Command) SetExpiration

func (c *Command) SetExpiration(expiresAt time.Time)

SetExpiration sets the expiration time.

func (*Command) SetPlatformJob

func (c *Command) SetPlatformJob(queuePriority int)

SetPlatformJob marks this command as a platform job and enqueues it.

func (*Command) SetStepRunID

func (c *Command) SetStepRunID(stepRunID shared.ID)

SetStepRunID sets the pipeline step run ID for tracking.

func (*Command) Start

func (c *Command) Start()

Start marks the command as running.

func (*Command) UpdateQueuePriority

func (c *Command) UpdateQueuePriority(newPriority int)

UpdateQueuePriority updates the queue priority (called by scheduler).

func (*Command) VerifyAuthToken

func (c *Command) VerifyAuthToken(token string) bool

VerifyAuthToken verifies if the provided token matches this command's token. Uses constant-time comparison to prevent timing attacks.

type CommandPriority

type CommandPriority string

CommandPriority represents the priority of a command.

const (
	CommandPriorityLow      CommandPriority = "low"
	CommandPriorityNormal   CommandPriority = "normal"
	CommandPriorityHigh     CommandPriority = "high"
	CommandPriorityCritical CommandPriority = "critical"
)

type CommandStats

type CommandStats struct {
	Total     int64
	Pending   int64
	Running   int64
	Completed int64
	Failed    int64
	Canceled  int64
}

CommandStats represents aggregated command statistics.

type CommandStatus

type CommandStatus string

CommandStatus represents the status of a command.

const (
	CommandStatusPending      CommandStatus = "pending"
	CommandStatusAcknowledged CommandStatus = "acknowledged"
	CommandStatusRunning      CommandStatus = "running"
	CommandStatusCompleted    CommandStatus = "completed"
	CommandStatusFailed       CommandStatus = "failed"
	CommandStatusCanceled     CommandStatus = "canceled"
	CommandStatusExpired      CommandStatus = "expired"
)

type CommandType

type CommandType string

CommandType represents the type of command.

const (
	CommandTypeScan         CommandType = "scan"
	CommandTypeCollect      CommandType = "collect"
	CommandTypeHealthCheck  CommandType = "health_check"
	CommandTypeConfigUpdate CommandType = "config_update"
	CommandTypeCancel       CommandType = "cancel"
)

type Filter

type Filter struct {
	TenantID        *shared.ID
	AgentID         *shared.ID
	Type            *CommandType
	Status          *CommandStatus
	Priority        *CommandPriority
	IsPlatformJob   *bool      // Filter by platform job status (v3.2)
	PlatformAgentID *shared.ID // Filter by assigned platform agent (v3.2)
}

Filter represents filter options for listing commands.

type QueuePosition

type QueuePosition struct {
	Position      int           `json:"position"`       // Position in queue (1-based)
	TotalQueued   int           `json:"total_queued"`   // Total jobs in queue
	Priority      int           `json:"priority"`       // Current priority score
	EstimatedWait time.Duration `json:"estimated_wait"` // Estimated wait time
}

QueuePosition represents a position in the platform job queue.

func (*QueuePosition) EstimateWaitTime

func (q *QueuePosition) EstimateWaitTime(avgJobDuration time.Duration, availableAgents int) time.Duration

EstimateWaitTime estimates the wait time based on position and historical data.

type Repository

type Repository interface {
	// Create creates a new command.
	Create(ctx context.Context, cmd *Command) error

	// GetByID retrieves a command by ID.
	GetByID(ctx context.Context, id shared.ID) (*Command, error)

	// GetByTenantAndID retrieves a command by tenant and ID.
	GetByTenantAndID(ctx context.Context, tenantID, id shared.ID) (*Command, error)

	// GetPendingForAgent retrieves pending commands for an agent.
	GetPendingForAgent(ctx context.Context, tenantID shared.ID, agentID *shared.ID, limit int) ([]*Command, error)

	// List lists commands with filters and pagination.
	List(ctx context.Context, filter Filter, page pagination.Pagination) (pagination.Result[*Command], error)

	// Update updates a command.
	Update(ctx context.Context, cmd *Command) error

	// Delete deletes a command.
	Delete(ctx context.Context, id shared.ID) error

	// ExpireOldCommands expires commands that have passed their expiration time.
	ExpireOldCommands(ctx context.Context) (int64, error)

	// FindExpired finds commands that have expired but not yet marked as expired.
	FindExpired(ctx context.Context) ([]*Command, error)

	// GetByAuthTokenHash retrieves a command by auth token hash.
	GetByAuthTokenHash(ctx context.Context, hash string) (*Command, error)

	// CountActivePlatformJobsByTenant counts active platform jobs for a tenant.
	// Active = pending, acknowledged, or running.
	CountActivePlatformJobsByTenant(ctx context.Context, tenantID shared.ID) (int, error)

	// CountQueuedPlatformJobsByTenant counts queued (pending, not dispatched) platform jobs for a tenant.
	CountQueuedPlatformJobsByTenant(ctx context.Context, tenantID shared.ID) (int, error)

	// CountQueuedPlatformJobs counts all queued platform jobs across all tenants.
	CountQueuedPlatformJobs(ctx context.Context) (int, error)

	// GetQueuedPlatformJobs retrieves queued platform jobs ordered by priority.
	// Returns jobs that are pending and not yet assigned to an agent.
	GetQueuedPlatformJobs(ctx context.Context, limit int) ([]*Command, error)

	// GetNextPlatformJob atomically claims the next job from the queue for an agent.
	// Uses FOR UPDATE SKIP LOCKED for concurrent safety.
	// Returns nil if no suitable job is available.
	GetNextPlatformJob(ctx context.Context, agentID shared.ID, capabilities []string, tools []string) (*Command, error)

	// UpdateQueuePriorities recalculates queue priorities for all pending platform jobs.
	// Returns the number of jobs updated.
	UpdateQueuePriorities(ctx context.Context) (int64, error)

	// RecoverStuckJobs returns stuck jobs to the queue.
	// A job is stuck if it's assigned but the agent is offline or hasn't progressed.
	// Returns the number of jobs recovered.
	RecoverStuckJobs(ctx context.Context, stuckThresholdMinutes int, maxRetries int) (int64, error)

	// ExpireOldPlatformJobs expires platform jobs that have been in queue too long.
	// Returns the number of jobs expired.
	ExpireOldPlatformJobs(ctx context.Context, maxQueueMinutes int) (int64, error)

	// GetQueuePosition gets the queue position for a specific command.
	GetQueuePosition(ctx context.Context, commandID shared.ID) (*QueuePosition, error)

	// ListPlatformJobsByTenant lists platform jobs for a tenant.
	ListPlatformJobsByTenant(ctx context.Context, tenantID shared.ID, page pagination.Pagination) (pagination.Result[*Command], error)

	// ListPlatformJobsAdmin lists platform jobs across all tenants (admin only).
	// Optional filters: agentID, tenantID, status.
	ListPlatformJobsAdmin(ctx context.Context, agentID, tenantID *shared.ID, status *CommandStatus, page pagination.Pagination) (pagination.Result[*Command], error)

	// GetPlatformJobsByAgent lists platform jobs assigned to an agent.
	GetPlatformJobsByAgent(ctx context.Context, agentID shared.ID, status *CommandStatus) ([]*Command, error)

	// RecoverStuckTenantCommands returns stuck tenant commands to the pool.
	// A command is stuck if it's assigned to an offline agent or hasn't been picked up.
	// Returns the number of commands recovered.
	RecoverStuckTenantCommands(ctx context.Context, stuckThresholdMinutes int, maxRetries int) (int64, error)

	// FailExhaustedCommands marks commands that exceeded max retries as failed.
	// Returns the number of commands failed.
	FailExhaustedCommands(ctx context.Context, maxRetries int) (int64, error)

	// GetStatsByTenant returns aggregated command statistics for a tenant in a single query.
	// This is optimized to avoid N queries when fetching stats.
	GetStatsByTenant(ctx context.Context, tenantID shared.ID) (CommandStats, error)
}

Repository defines the interface for command persistence.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL