group

package module
v0.0.0-...-b976376 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2025 License: MIT Imports: 20 Imported by: 0

README ΒΆ

[Group] Concurrency Kit

⚑ A lightweight, dependency-aware (yet another DAG) concurrency toolkit built on top of std errgroup, providing fine-grained control over concurrent task execution with minimal overhead.

Much less overhead by optional features selected

APIs

Go(ctx, opts, fs...) TryGo(ctx, opts, fs...) bool

GoCtx(ctx, opts, fs...) TryGoCtx(ctx, opts, fs...) bool


Opts(With...) *Options


NewGroup() *Group

(Group).Go(ctx) error

(Group).Add... *node (Group).Node(key) *node (Group).Verify(panicking) *node


WithStore(ctx, store) context.Context

Store(ctx, value) Fetch(ctx, key) Put(ctx, key, value)

Options

Get options by Opts(With...)

Available options:

  • WithPrefix(string) - Set group name for logging
  • WithLimit(int) - Set concurrency limit
  • WithPreFunc(PreFunc) - Set group pre-execution interceptor
  • WithAfterFunc(AfterFunc) - Set group post-execution interceptor
  • WithTimeout(time.Duration) - Set group timeout
  • WithLogger(*slog.Logger) - Use custom logger
  • WithLog - Enable logging
  • WithErrorCollector(chan error) - Collect errors in channel

Group Mode (DAG)

Features
  • πŸ”— Dependency Management: Define task dependencies with automatic execution ordering
  • 🧩 Weak Dependencies: Continue execution even when upstream tasks fail
  • πŸ“¦ Built-in Store: Share data between dependent tasks using context-based storage
  • πŸ’₯ Fail Strategy Control: Configure tasks with different failure behaviors
  • πŸ”„ Retry Mechanism: Configure automatic retry for individual nodes
  • 🎣 Interceptors: Pre and post-execution hooks at both group and node level
  • πŸ”™ Rollback Mechanism: Define compensation logic to revert changes when tasks fail
  • ⏱️ Timeout Control: Set timeouts at both group and node level
  • πŸ“Š Monitoring & Logging: Optional execution monitoring and logging
  • 🎨 Graphviz: Visualize complex dependency graphs in multiple formats
πŸƒ Error Propagation

Within a group, errors propagate according to dependency order, eventually returning only leaf errors that have already aggregated parent errors. If multiple leaf errors exist, they are aggregated using errors.Join (when a fast-fail error occurs, only the aggregated error from the fast-fail node is returned).

🎯 Fail Strategies
  • Default: Node errors propagate to downstreams and are included in final error aggregation
  • Fast-Fail: Halt entire group execution immediately on node error (only this error is warpped and returned)
  • Silent-Fail: Suppress error from final result but still block downstreams (no error recorded)

Note: Fast-Fail and Silent-Fail can be used simultaneously. When error occurs, a sentinel error will be used as the actual error (context.Canceled) and halt the entire group

Usage
[Basic Workflow]

Create a group using NewGroup with optional configurations, then add tasks using Add.... Each task can be assigned a unique key and specify its dependencies. Finally, execute the group with Go method.

[Task Types]

πŸš€ Simple Runner - Basic function that returns an error. No access to context or shared state.

🚁 Context-Aware Task - Receives a context parameter, allowing the task to respond to cancellation signals and timeouts. Context-aware tasks will be able to communicate data through Store and Fetch when using with storer-context. Additionally, you can directly insert key-value pairs into the storer-context by using Put.

🚒 Shared-State Task - Receives the context along with a shared state unit, enabling tasks to access and modify common data structures. Shared-state tasks will be able to access predefined shared data via the shared argument passed in (❗❗ beware of potential data race).

[Node Configuration]
  • Key(any) - Assign unique identifier
  • Dep(...any) - Add strong dependencies (blocks on upstream errors)
  • WeakDep(...any) - Add weak dependencies (continues on upstream errors)
  • FastFail() - Halt entire group on node error
  • SilentFail() - Suppress error but block downstreams
  • WithRetry(int) - Set retry attempts on failure
  • WithPreFunc(NodePreFunc) - Set node pre-execution interceptor
  • WithAfterFunc(NodeAfterFunc) - Set node post-execution interceptor
  • WithRollback(RollbackFunc) - Set compensation function executed on failure
  • WithTimeout(time.Duration) - Set node-specific timeout
[More...]

Refer to the example package in this repo

Verify

Verify checks for cycles in the dependency graph by using group.Verify() or Node.Verify()

Graphviz

Visualize your dependency graph by using

(Group).[RenderGraph/RenderGraphImage/RenderGraphToFile/DOT/GraphURL]


Benchmark

goos: darwin
goarch: arm64
pkg: github.com/oatcatx/group/benchmark
cpu: Apple M3 Pro
BenchmarkGo

BenchmarkGo/TinyWorkload
BenchmarkGo/TinyWorkload/StdGoroutine
BenchmarkGo/TinyWorkload/StdGoroutine-12         	   78626	     15192 ns/op	     259 B/op	      11 allocs/op
BenchmarkGo/TinyWorkload/StdErrGroup
BenchmarkGo/TinyWorkload/StdErrGroup-12          	   50954	     26485 ns/op	     400 B/op	      13 allocs/op
BenchmarkGo/TinyWorkload/Go
BenchmarkGo/TinyWorkload/Go-12                   	   40836	     29868 ns/op	    1105 B/op	      25 allocs/op
BenchmarkGo/TinyWorkload/GoWithOpts
BenchmarkGo/TinyWorkload/GoWithOpts-12           	   20181	     62942 ns/op	    5218 B/op	     102 allocs/op

BenchmarkGo/SmallWorkload
BenchmarkGo/SmallWorkload/StdGoroutine
BenchmarkGo/SmallWorkload/StdGoroutine-12        	     832	   1253898 ns/op	   12129 B/op	     201 allocs/op
BenchmarkGo/SmallWorkload/StdErrGroup
BenchmarkGo/SmallWorkload/StdErrGroup-12         	     956	   1277130 ns/op	   12197 B/op	     203 allocs/op
BenchmarkGo/SmallWorkload/Go
BenchmarkGo/SmallWorkload/Go-12                  	     922	   1318295 ns/op	   17186 B/op	     305 allocs/op
BenchmarkGo/SmallWorkload/GoWithOpts
BenchmarkGo/SmallWorkload/GoWithOpts-12          	     849	   1446455 ns/op	   49307 B/op	     946 allocs/op

BenchmarkGo/MediumWorkload
BenchmarkGo/MediumWorkload/StdGoroutine
BenchmarkGo/MediumWorkload/StdGoroutine-12       	     184	   6491799 ns/op	  122766 B/op	    2005 allocs/op
BenchmarkGo/MediumWorkload/StdErrGroup
BenchmarkGo/MediumWorkload/StdErrGroup-12        	     182	   6548971 ns/op	  120220 B/op	    2003 allocs/op
BenchmarkGo/MediumWorkload/Go
BenchmarkGo/MediumWorkload/Go-12                 	     163	   7078103 ns/op	  168493 B/op	    3005 allocs/op
BenchmarkGo/MediumWorkload/GoWithOpts
BenchmarkGo/MediumWorkload/GoWithOpts-12         	     165	   7278053 ns/op	  480847 B/op	    9282 allocs/op

BenchmarkGo/LargeWorkload
BenchmarkGo/LargeWorkload/StdGoroutine
BenchmarkGo/LargeWorkload/StdGoroutine-12               21	  52983192 ns/op	 1290996 B/op	   20188 allocs/op
BenchmarkGo/LargeWorkload/StdErrGroup
BenchmarkGo/LargeWorkload/StdErrGroup-12         	      13	  82499465 ns/op	 1239411 B/op	   20183 allocs/op
BenchmarkGo/LargeWorkload/Go
BenchmarkGo/LargeWorkload/Go-12                  	      15	  71173006 ns/op	 1680838 B/op	   30009 allocs/op
BenchmarkGo/LargeWorkload/GoWithOpts
BenchmarkGo/LargeWorkload/GoWithOpts-12          	      14	  73112423 ns/op	 4790445 B/op	   92600 allocs/op

goos: darwin
goarch: arm64
pkg: github.com/oatcatx/group/benchmark
cpu: Apple M3 Pro
BenchmarkGroup

BenchmarkGroup/StdErrGroup
BenchmarkGroup/StdErrGroup-12         	   79412	     18070 ns/op	     641 B/op	      17 allocs/op
BenchmarkGroup/Group
BenchmarkGroup/Group-12               	   50367	     24134 ns/op	    2680 B/op	      45 allocs/op

Documentation ΒΆ

Index ΒΆ

Constants ΒΆ

This section is empty.

Variables ΒΆ

View Source
var ErrPanic = errors.New("panic recovered")
View Source
var WithLog option = func(o *Options) { o.log = true }

Functions ΒΆ

func Fetch ΒΆ

func Fetch[T any](ctx context.Context, key any) (T, bool)

func Go ΒΆ

func Go(ctx context.Context, opts *Options, fs ...func() error) (err error)

func GoCtx ΒΆ

func GoCtx(ctx context.Context, opts *Options, fs ...func(context.Context) error) error

func NewMapStore ΒΆ

func NewMapStore() *mapStore

copy-on-write map store [ideal for read-heavy scenarios]

func Put ΒΆ

func Put[K, V any](ctx context.Context, key K, value V)

func RecoverContext ΒΆ

func RecoverContext(ctx context.Context, err *error)

func SafeRun ΒΆ

func SafeRun(ctx context.Context, f func() error) (err error)

func SafeRunNode ΒΆ

func SafeRunNode(ctx context.Context, f func(context.Context, any) error, shared any) (err error)

func Store ΒΆ

func Store[V any](ctx context.Context, value V)

func TryGo ΒΆ

func TryGo(ctx context.Context, opts *Options, fs ...func() error) (ok bool, err error)

func TryGoCtx ΒΆ

func TryGoCtx(ctx context.Context, opts *Options, fs ...func(context.Context) error) (bool, error)

func WithAfterFunc ΒΆ

func WithAfterFunc(f AfterFunc) option

func WithErrorCollector ΒΆ

func WithErrorCollector(errC chan error) option

func WithLimit ΒΆ

func WithLimit(x int) option

func WithLogger ΒΆ

func WithLogger(logger *slog.Logger) option

func WithPreFunc ΒΆ

func WithPreFunc(f PreFunc) option

func WithPrefix ΒΆ

func WithPrefix(s string) option

func WithStore ΒΆ

func WithStore(ctx context.Context, store Storer) context.Context

WithStore returns a new context with the provided store (storer-context)

func WithTimeout ΒΆ

func WithTimeout(t time.Duration) option

Types ΒΆ

type AfterFunc ΒΆ

type AfterFunc func(context.Context, error) error

type AutoNode ΒΆ

type AutoNode interface {
	Exec(ctx context.Context, shared any) (any, error)
	// contains filtered or unexported methods
}

type GraphOptions ΒΆ

type GraphOptions struct {
	Title           string           // graph title
	Format          graphviz.Format  // output format
	RankDir         cgraph.RankDir   // direction: TB (top-bottom), LR (left-right), BT, RL
	NodeShape       cgraph.Shape     // node shape: box, ellipse, circle, etc.
	NodeColor       string           // node color
	FastFailColor   string           // color for fast-fail nodes
	SilentFailColor string           // color for silent-fail nodes
	EdgeColor       string           // edge color
	WeakEdgeStyle   cgraph.EdgeStyle // style for weak dependency edges (dashed, dotted)
	ShowGroupInfo   bool             // show group options in title
	ShowNodeSpec    bool             // show node spec details
}

func DefaultGraphOptions ΒΆ

func DefaultGraphOptions() *GraphOptions

type Group ΒΆ

type Group struct {
	Options
	// contains filtered or unexported fields
}

func NewGroup ΒΆ

func NewGroup(opts ...option) *Group

Use [Add...] methods to add different types of nodes to the group

  • [AddAuto...] adds an auto node that returns a value and automatically stores it in the context store
  • CAUTION: will PANIC if auto nodes are not used with storer-context

func (*Group) AddAutoNode ΒΆ

func (g *Group) AddAutoNode(n AutoNode) *node

func (*Group) AddAutoRunner ΒΆ

func (g *Group) AddAutoRunner(runner func() (any, error)) *node

func (*Group) AddAutoSharedRunner ΒΆ

func (g *Group) AddAutoSharedRunner(runner func(any) (any, error)) *node

func (*Group) AddAutoSharedTask ΒΆ

func (g *Group) AddAutoSharedTask(task func(context.Context, any) (any, error)) *node

func (*Group) AddAutoTask ΒΆ

func (g *Group) AddAutoTask(task func(context.Context) (any, error)) *node

func (*Group) AddNode ΒΆ

func (g *Group) AddNode(n Node) *node

func (*Group) AddRunner ΒΆ

func (g *Group) AddRunner(runner func() error) *node

func (*Group) AddSharedRunner ΒΆ

func (g *Group) AddSharedRunner(runner func(any) error) *node

func (*Group) AddSharedTask ΒΆ

func (g *Group) AddSharedTask(task func(context.Context, any) error) *node

func (*Group) AddTask ΒΆ

func (g *Group) AddTask(task func(context.Context) error) *node

func (*Group) DOT ΒΆ

func (g *Group) DOT(ctx context.Context, opts *GraphOptions) (string, error)

dot format graph

func (*Group) Go ΒΆ

func (g *Group) Go(ctx context.Context, shared ...any) (err error)

Go runs the group with added nodes

  • if shared units are provided, they will be passed to the shared nodes
  • if len(shared) == 1, the node receives shared[0] (type any)
  • if len(shared) > 1, the node receives shared (type []any)
  • multiple shared units are not recommended

func (*Group) GraphURL ΒΆ

func (g *Group) GraphURL(ctx context.Context, opts *GraphOptions) (string, error)

dot graphviz url

func (*Group) Node ΒΆ

func (g *Group) Node(key any) *node

func (*Group) RenderGraph ΒΆ

func (g *Group) RenderGraph(ctx context.Context, opts *GraphOptions, w io.Writer) error

func (*Group) RenderGraphImage ΒΆ

func (g *Group) RenderGraphImage(ctx context.Context, opts *GraphOptions) (image.Image, error)

func (*Group) RenderGraphToFile ΒΆ

func (g *Group) RenderGraphToFile(ctx context.Context, opts *GraphOptions, filename string) error

func (*Group) Verify ΒΆ

func (g *Group) Verify(panicking bool) string

type Node ΒΆ

type Node interface {
	Exec(ctx context.Context, shared any) error
	// contains filtered or unexported methods
}

type NodeAfterFunc ΒΆ

type NodeAfterFunc func(ctx context.Context, shared any, err error) error

type NodePreFunc ΒΆ

type NodePreFunc func(ctx context.Context, shared any) error

node level interceptor

type NodeRollbackFunc ΒΆ

type NodeRollbackFunc func(ctx context.Context, shared any, err error) error

type Options ΒΆ

type Options struct {
	ErrC chan error // error collector
	// contains filtered or unexported fields
}

func Opts ΒΆ

func Opts(opts ...option) *Options

type PreFunc ΒΆ

type PreFunc func(context.Context) error

group level interceptor

type Storer ΒΆ

type Storer interface {
	Store(key, value any)
	Load(key any) (value any, ok bool)
}

Directories ΒΆ

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL