jobscheduler

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2025 License: MIT Imports: 15 Imported by: 0

README

Job Scheduler

Go Reference Go Report Card

A lightweight, persistent, and concurrency-aware job scheduler for Go.
Designed to register, schedule, execute, and manage cron-based jobs with metadata, optional web UI, and persistent storage.

✅ Built with gorm, robfig/cron/v3, and designed to be integrated into any Go application as a library.

✨ Features

  • ⏰ Cron-based job scheduling
  • 💾 Persistent storage (via GORM)
  • 🔁 Automatic job restoration on restart
  • 🔒 Concurrency control via semaphore
  • 🧠 Reflection-based function registration
  • 🧩 Idempotent job definitions (safe to call on each boot)
  • 📈 Job execution tracking (status, latency, failure count)
  • 🖥️ Optional web-based UI for monitoring (toggle via config)

📦 Installation

go get github.com/Ctere1/jobscheduler

⚙️ Initialization Example

import (
	"time"
	"log"

	"github.com/Ctere1/jobscheduler"
	"gorm.io/driver/postgres"
	"gorm.io/gorm"
)

func main() {
	dsn := "host=localhost user=postgres password=postgres dbname=jobscheduler port=5432 sslmode=disable"
	db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
	if err != nil {
		panic("failed to connect database")
	}

	scheduler, err := jobscheduler.New(db, jobscheduler.Config{
		MaxConcurrentJobs: 5,
		JobTimeout:        30 * time.Second,
		EnableWebUI:       true,
		WebListen:         "127.0.0.1:8080",
	})
	if err != nil {
		log.Fatal(err)
	}

	scheduler.Schedule("daily-cleanup", "0 3 * * *", cleanupTask, nil)

	select {}
}

func cleanupTask() {
	log.Println("Running cleanup...")
}

🧠 How It Works

1. Job Registration

Jobs are registered with a name, cron spec, function, and optional metadata. The function is registered using reflection and stored by name.

2. Persistence

All jobs are persisted to the database (gorm-compatible) using the Job model.On startup, jobs are restored and scheduled automatically.

3. Concurrency Control

Job concurrency is managed using a buffered channel (chan struct{}) as a semaphore. This allows you to set the maximum number of parallel jobs via MaxConcurrentJobs.

4. Timeouts

Jobs can have a maximum execution time. If they exceed JobTimeout, they are marked as failed.

5. Rescheduling

Calling Schedule() on a job with the same name updates the cron spec/metadata if it already exists — making boot-time job declarations safe and idempotent.

🔧 Configuration

The scheduler can be configured using the Config struct. Here are the available options:

  • MaxConcurrentJobs: Maximum number of jobs that can run concurrently. Default is 5.
  • JobTimeout: Maximum time a job can run before being terminated. Default is 30 seconds.
  • EnableWebUI: Enable or disable the built-in web UI. Default is false.
  • WebListen: Address for the web UI to listen on. Default is "127.0.0.1:8080".
scheduler, err := jobscheduler.New(db, jobscheduler.Config{
    MaxConcurrentJobs: 5,
    JobTimeout:        30 * time.Second,
	EnableWebUI:       false,
	WebListen:         "127.0.0.1:8080"
})

🛠️ API Overview

Method Description
Schedule Register a new job (or reschedule if exists)
Reschedule Update the schedule and metadata
Unschedule Remove the job from DB and cron
RunNow Run job immediately
ListJobs Retrieve all jobs
GetJob Lookup job by ID or name
Stop Stop the scheduler

🧩 Example: Multiple Jobs

scheduler.Schedule("hello", "*/5 * * * *", func() {
	fmt.Println("Hello from job!")
}, nil)

scheduler.Schedule("system-metrics", "0 */1 * * *", collectMetrics, map[string]any{
	"source": "agent",
})

jobs, _ := scheduler.ListJobs()
for _, job := range jobs {
	if job.FailCount > 0 {
		fmt.Printf("Job %s failed %d times. Last error: %s\n", 
		job.Name, job.FailCount, job.LastError)
	}
}

📘 Job Model Schema

Internally, jobs are stored with the following fields:

  • JobID: UUID
  • Name: Unique job name
  • Spec: Cron expression
  • Payload.Func: Registered function name (via reflection)
  • Payload.Data: Metadata map
  • Status: Scheduled / Running / Success / Failed
  • RunCount: Total runs
  • SuccessCount: Successful runs
  • FailCount: Failed runs
  • LastError: Error from last failure (if any)
  • LastRun: Timestamp of last run
  • NextRun: Timestamp of next run (auto-calculated)
  • Latency: Duration of last execution in milliseconds

🖥️ Optional Web UI

If EnableWebUI is set to true, a basic web dashboard is served.

Config{
	EnableWebUI: true,
	WebListen:   "127.0.0.1:8080",
}

This dashboard provides:

  • Job list and status
  • Next/last run timestamps
  • Run/fail counts

Documentation

For detailed documentation, including configuration options and advanced usage, please refer to the GoDoc page.

Contributing

Contributions are welcome! Please read the CONTRIBUTING.md for guidelines on how to contribute to this project.

License

This project is licensed under the MIT License. See the LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	DefaultMaxConcurrentJobs = 10
	DefaultJobTimeout        = 30 * time.Second
	DefaultWebListen         = "localhost:8080"
)

Default configuration values

Variables

View Source
var (
	ErrJobNameEmpty         = errors.New("job name cannot be empty")
	ErrJobFunctionNil       = errors.New("job function cannot be nil")
	ErrJobFunctionInvalid   = errors.New("job function must be a valid function")
	ErrJobFunctionNameEmpty = errors.New("could not determine function name")
	ErrJobNotFound          = errors.New("job not found")
	ErrFunctionNotFound     = errors.New("function not registered")
	ErrJobTimeout           = errors.New("job execution timed out")
	ErrJobAlreadyRunning    = errors.New("job is already running")
	ErrConcurrencyLimit     = errors.New("concurrency limit reached")
)

Error definitions for the job scheduler

Functions

This section is empty.

Types

type Config

type Config struct {
	MaxConcurrentJobs int           // Maximum number of concurrent jobs
	JobTimeout        time.Duration // Timeout duration for job execution
	EnableWebUI       bool          // Whether to enable web interface
	WebListen         string        // Web interface listen address
}

Config holds the configuration for the job scheduler

func DefaultConfig added in v0.0.2

func DefaultConfig() Config

DefaultConfig returns the default configuration

type Job added in v0.0.2

type Job struct {
	gorm.Model
	JobID        uuid.UUID  `gorm:"type:uuid;primaryKey"`
	Name         string     `gorm:"not null;size:255"`
	Spec         string     `gorm:"not null;size:100"`
	Payload      JobPayload `gorm:"type:jsonb"`
	Status       JobStatus  `gorm:"index;size:20;default:scheduled"`
	LastRun      *time.Time `gorm:"index"`
	NextRun      time.Time  `gorm:"index"`
	RunCount     int        `gorm:"default:0"`
	SuccessCount int        `gorm:"default:0"`
	FailCount    int        `gorm:"default:0"`
	LastError    string     `gorm:"type:text"`
	Latency      int64      `gorm:"default:0"` // milliseconds
}

Job represents a scheduled job

type JobPayload added in v0.0.2

type JobPayload struct {
	Type JobType        `json:"type"`
	Name string         `json:"name"`
	Func string         `json:"func,omitempty"`
	Data map[string]any `json:"data,omitempty"`
}

JobPayload contains the execution details of a job

func (*JobPayload) Scan added in v0.0.2

func (jp *JobPayload) Scan(value any) error

Scan implements the Scanner interface for JobPayload

func (JobPayload) Value added in v0.0.2

func (jp JobPayload) Value() (driver.Value, error)

Value implements the driver Valuer interface for JobPayload

type JobStatus added in v0.0.2

type JobStatus string

JobStatus represents the status of a job

const (
	StatusScheduled JobStatus = "scheduled"
	StatusRunning   JobStatus = "running"
	StatusSuccess   JobStatus = "success"
	StatusFailed    JobStatus = "failed"
)

type JobType added in v0.0.2

type JobType string

JobType represents the type of job

const (
	TypeFunction JobType = "function"
	TypeHTTP     JobType = "http"
)

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler manages job scheduling and execution

func New

func New(db *gorm.DB, config Config) (*Scheduler, error)

New creates a new Scheduler instance

func (*Scheduler) GetJob

func (s *Scheduler) GetJob(identifier any) (*Job, error)

GetJob retrieves a job by ID or name

func (*Scheduler) ListJobs

func (s *Scheduler) ListJobs() ([]Job, error)

ListJobs returns all jobs

func (*Scheduler) Reschedule

func (s *Scheduler) Reschedule(identifier any, newSpec string, newMetadata map[string]any) (*Job, error)

Reschedule updates an existing job

func (*Scheduler) RunNow

func (s *Scheduler) RunNow(identifier any) error

RunNow executes a job immediately

func (*Scheduler) Schedule

func (s *Scheduler) Schedule(name, spec string, function any, metadata map[string]any) (*Job, error)

Schedule creates and schedules a new job

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop shuts down the scheduler

func (*Scheduler) Unschedule

func (s *Scheduler) Unschedule(identifier any) error

Unschedule removes a job

type Storage added in v0.0.2

type Storage interface {
	CreateJob(ctx context.Context, job *Job) error
	UpdateJob(ctx context.Context, job *Job) error
	DeleteJob(ctx context.Context, uuid uuid.UUID) error
	GetJobByJobID(ctx context.Context, uuid uuid.UUID) (*Job, error)
	GetJobByName(ctx context.Context, name string) (*Job, error)
	ListJobs(ctx context.Context) ([]Job, error)
}

Storage defines the interface for job persistence

type WebUI added in v0.0.2

type WebUI struct {
	// contains filtered or unexported fields
}

func NewWebUI added in v0.0.2

func NewWebUI(scheduler *Scheduler) *WebUI

func (*WebUI) Start added in v0.0.2

func (ui *WebUI) Start(listen string) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL