Documentation
¶
Overview ¶
Package command defines the Command domain entity for server-controlled agents.
Index ¶
- Constants
- type Command
- func (c *Command) Acknowledge()
- func (c *Command) AssignToPlatformAgent(agentID shared.ID)
- func (c *Command) CanAcceptIngest(agentID shared.ID, token string) bool
- func (c *Command) CanBeAcknowledged() bool
- func (c *Command) CanRetry(maxRetries int) bool
- func (c *Command) Cancel()
- func (c *Command) ClearAuthToken()
- func (c *Command) Complete(result json.RawMessage)
- func (c *Command) Expire()
- func (c *Command) Fail(errorMessage string)
- func (c *Command) GenerateAuthToken(ttl time.Duration) (string, error)
- func (c *Command) IsAuthTokenValid() bool
- func (c *Command) IsDispatchedToPlatformAgent() bool
- func (c *Command) IsExpired() bool
- func (c *Command) IsPending() bool
- func (c *Command) IsQueued() bool
- func (c *Command) ReturnToQueue()
- func (c *Command) SetAgentID(agentID shared.ID)
- func (c *Command) SetExpiration(expiresAt time.Time)
- func (c *Command) SetPlatformJob(queuePriority int)
- func (c *Command) SetStepRunID(stepRunID shared.ID)
- func (c *Command) Start()
- func (c *Command) UpdateQueuePriority(newPriority int)
- func (c *Command) VerifyAuthToken(token string) bool
- type CommandPriority
- type CommandStats
- type CommandStatus
- type CommandType
- type Filter
- type QueuePosition
- type Repository
Constants ¶
const ( // AuthTokenLength is the length of generated auth tokens (32 bytes = 64 hex chars) AuthTokenLength = 32 // AuthTokenPrefix is the prefix for auth tokens AuthTokenPrefix = "oc-cmd-" // DefaultAuthTokenTTL is the default time-to-live for auth tokens (24 hours) DefaultAuthTokenTTL = 24 * time.Hour )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Command ¶
type Command struct {
ID shared.ID
TenantID shared.ID
AgentID *shared.ID // Target agent (nil = any agent can pick up)
Type CommandType
Priority CommandPriority
Payload json.RawMessage
Status CommandStatus
ErrorMessage string
// Timing
CreatedAt time.Time
ExpiresAt *time.Time
AcknowledgedAt *time.Time
StartedAt *time.Time
CompletedAt *time.Time
// Result
Result json.RawMessage
// Scheduling
ScheduledAt *time.Time
ScheduleID *shared.ID
// Pipeline tracking
StepRunID *shared.ID // Reference to pipeline step run (for progression tracking)
// IsPlatformJob indicates this job runs on a platform agent (not tenant's own agent)
IsPlatformJob bool
// PlatformAgentID is the platform agent assigned to execute this job (auto-selected)
PlatformAgentID *shared.ID
// AuthTokenHash is the SHA256 hash of the auth token (for verification)
AuthTokenHash string
// AuthTokenPrefix is the first 8 characters of the token (for logging/debugging)
AuthTokenPrefix string
// AuthTokenExpiresAt is when the auth token expires (typically 24h after creation)
AuthTokenExpiresAt *time.Time
// QueuePriority is the calculated priority score (plan_base + age_bonus)
// Higher value = processed first
QueuePriority int
// QueuedAt is when the job was added to the platform queue
QueuedAt *time.Time
// DispatchAttempts tracks how many times dispatch was attempted
DispatchAttempts int
// LastDispatchAt is the last time dispatch was attempted
LastDispatchAt *time.Time
}
Command represents a command to be executed by an agent.
func NewCommand ¶
func NewCommand(tenantID shared.ID, cmdType CommandType, priority CommandPriority, payload json.RawMessage) (*Command, error)
NewCommand creates a new Command entity.
func (*Command) Acknowledge ¶
func (c *Command) Acknowledge()
Acknowledge marks the command as acknowledged.
func (*Command) AssignToPlatformAgent ¶
AssignToPlatformAgent assigns this job to a specific platform agent.
func (*Command) CanAcceptIngest ¶
CanAcceptIngest checks if this command can accept ingest data from a platform agent. The command must be running, have a valid token, and match the agent.
func (*Command) CanBeAcknowledged ¶
CanBeAcknowledged checks if the command can be acknowledged.
func (*Command) ClearAuthToken ¶
func (c *Command) ClearAuthToken()
ClearAuthToken clears the auth token (call after command completes).
func (*Command) Complete ¶
func (c *Command) Complete(result json.RawMessage)
Complete marks the command as completed.
func (*Command) GenerateAuthToken ¶
GenerateAuthToken generates a new auth token for this command. Returns the raw token (to be sent to agent) and sets the hash on the command. The raw token should only be transmitted once and never stored.
func (*Command) IsAuthTokenValid ¶
IsAuthTokenValid checks if the auth token is still valid (not expired).
func (*Command) IsDispatchedToPlatformAgent ¶
IsDispatchedToPlatformAgent checks if this job has been dispatched to a platform agent.
func (*Command) ReturnToQueue ¶
func (c *Command) ReturnToQueue()
ReturnToQueue returns the job to the queue (e.g., if agent went offline).
func (*Command) SetAgentID ¶
SetAgentID sets the target agent ID.
func (*Command) SetExpiration ¶
SetExpiration sets the expiration time.
func (*Command) SetPlatformJob ¶
SetPlatformJob marks this command as a platform job and enqueues it.
func (*Command) SetStepRunID ¶
SetStepRunID sets the pipeline step run ID for tracking.
func (*Command) UpdateQueuePriority ¶
UpdateQueuePriority updates the queue priority (called by scheduler).
func (*Command) VerifyAuthToken ¶
VerifyAuthToken verifies if the provided token matches this command's token. Uses constant-time comparison to prevent timing attacks.
type CommandPriority ¶
type CommandPriority string
CommandPriority represents the priority of a command.
const ( CommandPriorityLow CommandPriority = "low" CommandPriorityNormal CommandPriority = "normal" CommandPriorityHigh CommandPriority = "high" CommandPriorityCritical CommandPriority = "critical" )
type CommandStats ¶
type CommandStats struct {
Total int64
Pending int64
Running int64
Completed int64
Failed int64
Canceled int64
}
CommandStats represents aggregated command statistics.
type CommandStatus ¶
type CommandStatus string
CommandStatus represents the status of a command.
const ( CommandStatusPending CommandStatus = "pending" CommandStatusAcknowledged CommandStatus = "acknowledged" CommandStatusRunning CommandStatus = "running" CommandStatusCompleted CommandStatus = "completed" CommandStatusFailed CommandStatus = "failed" CommandStatusCanceled CommandStatus = "canceled" CommandStatusExpired CommandStatus = "expired" )
type CommandType ¶
type CommandType string
CommandType represents the type of command.
const ( CommandTypeScan CommandType = "scan" CommandTypeCollect CommandType = "collect" CommandTypeHealthCheck CommandType = "health_check" CommandTypeConfigUpdate CommandType = "config_update" CommandTypeCancel CommandType = "cancel" )
type Filter ¶
type Filter struct {
TenantID *shared.ID
AgentID *shared.ID
Type *CommandType
Status *CommandStatus
Priority *CommandPriority
IsPlatformJob *bool // Filter by platform job status (v3.2)
PlatformAgentID *shared.ID // Filter by assigned platform agent (v3.2)
}
Filter represents filter options for listing commands.
type QueuePosition ¶
type QueuePosition struct {
Position int `json:"position"` // Position in queue (1-based)
TotalQueued int `json:"total_queued"` // Total jobs in queue
Priority int `json:"priority"` // Current priority score
EstimatedWait time.Duration `json:"estimated_wait"` // Estimated wait time
}
QueuePosition represents a position in the platform job queue.
func (*QueuePosition) EstimateWaitTime ¶
func (q *QueuePosition) EstimateWaitTime(avgJobDuration time.Duration, availableAgents int) time.Duration
EstimateWaitTime estimates the wait time based on position and historical data.
type Repository ¶
type Repository interface {
// Create creates a new command.
Create(ctx context.Context, cmd *Command) error
// GetByID retrieves a command by ID.
GetByID(ctx context.Context, id shared.ID) (*Command, error)
// GetByTenantAndID retrieves a command by tenant and ID.
GetByTenantAndID(ctx context.Context, tenantID, id shared.ID) (*Command, error)
// GetPendingForAgent retrieves pending commands for an agent.
GetPendingForAgent(ctx context.Context, tenantID shared.ID, agentID *shared.ID, limit int) ([]*Command, error)
// List lists commands with filters and pagination.
List(ctx context.Context, filter Filter, page pagination.Pagination) (pagination.Result[*Command], error)
// Update updates a command.
Update(ctx context.Context, cmd *Command) error
// Delete deletes a command.
Delete(ctx context.Context, id shared.ID) error
// ExpireOldCommands expires commands that have passed their expiration time.
ExpireOldCommands(ctx context.Context) (int64, error)
// FindExpired finds commands that have expired but not yet marked as expired.
FindExpired(ctx context.Context) ([]*Command, error)
// GetByAuthTokenHash retrieves a command by auth token hash.
GetByAuthTokenHash(ctx context.Context, hash string) (*Command, error)
// CountActivePlatformJobsByTenant counts active platform jobs for a tenant.
// Active = pending, acknowledged, or running.
CountActivePlatformJobsByTenant(ctx context.Context, tenantID shared.ID) (int, error)
// CountQueuedPlatformJobsByTenant counts queued (pending, not dispatched) platform jobs for a tenant.
CountQueuedPlatformJobsByTenant(ctx context.Context, tenantID shared.ID) (int, error)
// CountQueuedPlatformJobs counts all queued platform jobs across all tenants.
CountQueuedPlatformJobs(ctx context.Context) (int, error)
// GetQueuedPlatformJobs retrieves queued platform jobs ordered by priority.
// Returns jobs that are pending and not yet assigned to an agent.
GetQueuedPlatformJobs(ctx context.Context, limit int) ([]*Command, error)
// GetNextPlatformJob atomically claims the next job from the queue for an agent.
// Uses FOR UPDATE SKIP LOCKED for concurrent safety.
// Returns nil if no suitable job is available.
GetNextPlatformJob(ctx context.Context, agentID shared.ID, capabilities []string, tools []string) (*Command, error)
// UpdateQueuePriorities recalculates queue priorities for all pending platform jobs.
// Returns the number of jobs updated.
UpdateQueuePriorities(ctx context.Context) (int64, error)
// RecoverStuckJobs returns stuck jobs to the queue.
// A job is stuck if it's assigned but the agent is offline or hasn't progressed.
// Returns the number of jobs recovered.
RecoverStuckJobs(ctx context.Context, stuckThresholdMinutes int, maxRetries int) (int64, error)
// ExpireOldPlatformJobs expires platform jobs that have been in queue too long.
// Returns the number of jobs expired.
ExpireOldPlatformJobs(ctx context.Context, maxQueueMinutes int) (int64, error)
// GetQueuePosition gets the queue position for a specific command.
GetQueuePosition(ctx context.Context, commandID shared.ID) (*QueuePosition, error)
// ListPlatformJobsByTenant lists platform jobs for a tenant.
ListPlatformJobsByTenant(ctx context.Context, tenantID shared.ID, page pagination.Pagination) (pagination.Result[*Command], error)
// ListPlatformJobsAdmin lists platform jobs across all tenants (admin only).
// Optional filters: agentID, tenantID, status.
ListPlatformJobsAdmin(ctx context.Context, agentID, tenantID *shared.ID, status *CommandStatus, page pagination.Pagination) (pagination.Result[*Command], error)
// GetPlatformJobsByAgent lists platform jobs assigned to an agent.
GetPlatformJobsByAgent(ctx context.Context, agentID shared.ID, status *CommandStatus) ([]*Command, error)
// RecoverStuckTenantCommands returns stuck tenant commands to the pool.
// A command is stuck if it's assigned to an offline agent or hasn't been picked up.
// Returns the number of commands recovered.
RecoverStuckTenantCommands(ctx context.Context, stuckThresholdMinutes int, maxRetries int) (int64, error)
// FailExhaustedCommands marks commands that exceeded max retries as failed.
// Returns the number of commands failed.
FailExhaustedCommands(ctx context.Context, maxRetries int) (int64, error)
// GetStatsByTenant returns aggregated command statistics for a tenant in a single query.
// This is optimized to avoid N queries when fetching stats.
GetStatsByTenant(ctx context.Context, tenantID shared.ID) (CommandStats, error)
}
Repository defines the interface for command persistence.