loadgen

package
v0.0.0-...-e33cdd0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2026 License: MIT Imports: 29 Imported by: 2

Documentation

Index

Constants

View Source
const BaseIterationRetryBackoff = 1 * time.Second
View Source
const DefaultIterations = 10
View Source
const DefaultMaxConcurrentIterations = 10
View Source
const DefaultMaxIterationAttempts = 1
View Source
const MaxIterationRetryBackoff = 60 * time.Second
View Source
const OmesExecutionIDSearchAttribute = "OmesExecutionID"

Variables

This section is empty.

Functions

func ExportWorkflowHistories

func ExportWorkflowHistories(ctx context.Context, info ScenarioInfo) error

ExportWorkflowHistories exports workflow histories based on filter settings.

func GetScenarios

func GetScenarios() map[string]*Scenario

GetScenarios gets a copy of registered scenarios

func InitSearchAttribute

func InitSearchAttribute(
	ctx context.Context,
	info ScenarioInfo,
	attributeName string,
) error

InitSearchAttribute ensures that a search attribute is defined in the namespace. It creates the search attribute if it doesn't exist, or verifies it exists if it does. This implementation matches the throughput_stress initSearchAttribute behavior.

func MinVisibilityCountEventually

func MinVisibilityCountEventually(
	ctx context.Context,
	info ScenarioInfo,
	request *workflowservice.CountWorkflowExecutionsRequest,
	minCount int,
	waitAtMost time.Duration,
) error

func MustRegisterScenario

func MustRegisterScenario(scenario Scenario)

MustRegisterScenario registers a scenario in the global static registry. Panics if registration fails. The file name of the caller is used as the scenario name.

func TaskQueueForRun

func TaskQueueForRun(runID string) string

TaskQueueForRun returns the task queue name for the given run ID.

func VerifyNoFailedWorkflows

func VerifyNoFailedWorkflows(ctx context.Context, info ScenarioInfo, searchAttribute, runID string) error

VerifyNoFailedWorkflows verifies that there are no failed or terminated workflows for the given search attribute.

Types

type Configurable

type Configurable interface {
	// Configure the executor with the given scenario info.
	//
	// Call this method if you want to ensure that all required configuration parameters
	// are present and valid without actually running the executor.
	Configure(ScenarioInfo) error
}

Optional interface that can be implemented by an Executor to make it configurable.

type DistributionField

type DistributionField[T distValueType] struct {
	// contains filtered or unexported fields
}

DistributionField is a generic wrapper for any distribution that can be serialized to/from JSON.

Supported distribution types:

  • "fixed" - Fixed distribution that always returns the same value. Example: {"type": "fixed", "value": "500"}

  • "discrete" - Discrete distribution with weighted values. Example: {"type": "discrete", "weights": {"100": 1, "200": 2, "300": 3}}

  • "uniform" - Uniform distribution. Example: {"type": "uniform", "min": "100", "max": "1000"}

  • "zipf" - Zipf distribution. Example: {"type": "zipf", "s": 2.0, "v": 1.0, "n": 100}

  • "normal" - Normal distribution. Example: {"type": "normal", "mean": "500", "stdDev": "100", "min": "0", "max": "1000"}

func NewDiscreteDistribution

func NewDiscreteDistribution[T distValueType](weights map[T]int) DistributionField[T]

func NewFixedDistribution

func NewFixedDistribution[T distValueType](value T) DistributionField[T]

func (DistributionField[T]) MarshalJSON

func (df DistributionField[T]) MarshalJSON() ([]byte, error)

NOTE: each distribution's marshaller is responsible for setting the "type" field.

func (*DistributionField[T]) UnmarshalJSON

func (df *DistributionField[T]) UnmarshalJSON(data []byte) error

type Executor

type Executor interface {
	// Run the scenario
	Run(context.Context, ScenarioInfo) error
}

Executor for a scenario.

type ExecutorFunc

type ExecutorFunc func(context.Context, ScenarioInfo) error

ExecutorFunc is an Executor implementation for a function

func (ExecutorFunc) Run

func (e ExecutorFunc) Run(ctx context.Context, info ScenarioInfo) error

Run implements [Executor.Run].

type ExportOptions

type ExportOptions struct {
	// Directory to export histories (empty = disabled)
	ExportHistoriesDir string
	// Status filter: "failed", "terminated", "failed,terminated", "all"
	ExportHistoriesFilter string
}

ExportOptions contains configuration for exporting scenario data.

type FileOrArgs

type FileOrArgs struct {
	// If set, the file to load the input from
	FilePath string
	// If set, args to pass to the Rust input generator. Do not specify output args, as it is
	// expected that the proto output is written to stdout (the default).
	Args []string
}

type FuzzExecutor

type FuzzExecutor struct {
	// Must be specified, called once on startup, and determines how TestInputs will be used for
	// iterations of the scenario. If a file is specified, it will be loaded and used as the input
	// for every iteration. If generator args are specified, it will be invoked once per iteration
	// and those inputs will be saved and then fed out to each iteration.
	InitInputs func(context.Context, ScenarioInfo) FileOrArgs

	DefaultConfiguration RunConfiguration
}

func (FuzzExecutor) GetDefaultConfiguration

func (k FuzzExecutor) GetDefaultConfiguration() RunConfiguration

func (FuzzExecutor) Run

func (k FuzzExecutor) Run(ctx context.Context, info ScenarioInfo) error

type GenericExecutor

type GenericExecutor struct {
	// Function to execute a single iteration of this scenario
	Execute func(context.Context, *Run) error
}

func (*GenericExecutor) Run

func (g *GenericExecutor) Run(ctx context.Context, info ScenarioInfo) error

type HasDefaultConfiguration

type HasDefaultConfiguration interface {
	GetDefaultConfiguration() RunConfiguration
}

HasDefaultConfiguration is an interface executors can implement to show their default configuration.

type KitchenSinkExecutor

type KitchenSinkExecutor struct {
	TestInput *kitchensink.TestInput

	// Called once on start
	PrepareTestInput func(context.Context, ScenarioInfo, *kitchensink.TestInput) error

	// Called for each iteration. TestInput is copied entirely into KitchenSinkWorkflowOptions on
	// each iteration.
	UpdateWorkflowOptions func(context.Context, *Run, *KitchenSinkWorkflowOptions) error
}

func (KitchenSinkExecutor) Run

type KitchenSinkWorkflowOptions

type KitchenSinkWorkflowOptions struct {
	Params       *kitchensink.TestInput
	StartOptions client.StartWorkflowOptions
}

type Resumable

type Resumable interface {
	// LoadState loads a snapshot into the executor's internal state.
	//
	// Implementations should pass a reference to a state variable to the loader function and assign to their internal state.
	// Callers should call this function before invoking the executor's Run method.
	LoadState(loader func(any) error) error
	// Snapshot returns a snapshot of the executor's internal state. The returned value must be serializable.
	//
	// The serialization format should be supported by the caller of this function.
	// Callers may call this function periodically to get a snapshot of the executor's state.
	Snapshot() any
}

Optional interface that can be implemented by an Executor to allow it to be resumable.

type Run

type Run struct {
	// Do not mutate this, this is shared across the entire scenario
	*ScenarioInfo

	// Each run should have a unique iteration.
	Iteration int
	Logger    *zap.SugaredLogger
	// contains filtered or unexported fields
}

Run represents an individual scenario run (many may be in a single instance (of possibly many) of a scenario).

func (*Run) DefaultKitchenSinkWorkflowOptions

func (r *Run) DefaultKitchenSinkWorkflowOptions() KitchenSinkWorkflowOptions

DefaultKitchenSinkWorkflowOptions gets the default kitchen sink workflow info.

func (*Run) DefaultStartWorkflowOptions

func (r *Run) DefaultStartWorkflowOptions() client.StartWorkflowOptions

DefaultStartWorkflowOptions gets default start workflow info.

func (*Run) ExecuteAnyWorkflow

func (r *Run) ExecuteAnyWorkflow(ctx context.Context, options client.StartWorkflowOptions, workflow any, valuePtr any, args ...any) error

ExecuteAnyWorkflow wraps calls to the client executing workflows to include some logging, returning an error if the execution fails.

func (*Run) ExecuteKitchenSinkWorkflow

func (r *Run) ExecuteKitchenSinkWorkflow(ctx context.Context, options *KitchenSinkWorkflowOptions) error

ExecuteKitchenSinkWorkflow starts the generic "kitchen sink" workflow and waits for its completion ignoring its result. Concurrently it will perform any client actions specified in kitchensink.TestInput.ClientSequence

func (*Run) ShouldRetry

func (r *Run) ShouldRetry(err error) (time.Duration, bool)

ShouldRetry determines if another attempt should be made. It returns the backoff duration to wait before retrying and a boolean indicating whether a retry should occur.

func (*Run) TaskQueue

func (r *Run) TaskQueue() string

type RunConfiguration

type RunConfiguration struct {
	// Number of iterations to run of this scenario (mutually exclusive with Duration).
	Iterations int
	// StartFromIteration is the iteration to start from when resuming a run.
	// This is used to skip iterations that have already been run.
	// Default is zero. If Iterations is set, too, must be less than or equal to Iterations.
	StartFromIteration int
	// MaxIterationAttempts is the maximum number of attempts to run the scenario.
	// Default (and minimum) is 1.
	MaxIterationAttempts int
	// Duration limit of this scenario (mutually exclusive with Iterations). If neither iterations
	// nor duration is set, default is DefaultIterations. When the Duration is elapsed, no new
	// iterations will be started, but we will wait for any currently running iterations to
	// complete.
	Duration time.Duration
	// Maximum number of instances of the Execute method to run concurrently.
	// Default is DefaultMaxConcurrent.
	MaxConcurrent int
	// MaxIterationsPerSecond is the maximum number of iterations to run per second.
	// Default is zero, meaning unlimited.
	MaxIterationsPerSecond float64
	// Timeout is the maximum amount of time we'll wait for the scenario to finish running.
	// If the timeout is hit any pending executions will be cancelled and the scenario will exit
	// with an error. The default is unlimited.
	Timeout time.Duration
	// Do not register the default search attributes used by scenarios. If the SAs are not registered
	// by the run, they must be registered by some other method. This is needed because cloud cells
	// cannot use the SDK to register SAs, instead the SAs must be registered through the control plane.
	// Default is false.
	DoNotRegisterSearchAttributes bool
	// IgnoreAlreadyStarted, if set, will not error when a workflow with the same ID already exists.
	// Default is false.
	IgnoreAlreadyStarted bool
	// OnCompletion, if set, is invoked after each successful iteration completes.
	OnCompletion func(context.Context, *Run)
	// HandleExecuteError, if set, is called when Execute returns an error, allowing transformation of errors.
	HandleExecuteError func(context.Context, *Run, error) error
}

func (*RunConfiguration) ApplyDefaults

func (r *RunConfiguration) ApplyDefaults()

func (RunConfiguration) Validate

func (r RunConfiguration) Validate() error

type Scenario

type Scenario struct {
	Description string
	ExecutorFn  func() Executor
}

func GetScenario

func GetScenario(name string) *Scenario

GetScenario gets a scenario by name from the global static registry.

type ScenarioInfo

type ScenarioInfo struct {
	// Name of the scenario (inferred from the file name)
	ScenarioName string
	// Run ID of the current scenario run, used to generate a unique task queue
	// and workflow ID prefix. This is a single value for the whole scenario, and
	// not a Workflow RunId.
	RunID string
	// ExecutionID is a randomly generated ID that uniquely identifies this particular
	// execution of the scenario. Combined with RunID, it ensures no two executions collide.
	ExecutionID string
	// Metrics component for registering new metrics.
	MetricsHandler client.MetricsHandler
	// A zap logger.
	Logger *zap.SugaredLogger
	// A Temporal client.
	Client client.Client
	// Configuration info passed by user if any.
	Configuration RunConfiguration
	// ScenarioOptions are info passed from the command line. Do not mutate these.
	ScenarioOptions map[string]string
	// The namespace that was used when connecting the client.
	Namespace string
	// Path to the root of the omes dir
	RootPath string
	// ExportOptions contains export-related configuration
	ExportOptions ExportOptions
}

ScenarioInfo contains information about the scenario under execution.

func (*ScenarioInfo) NewRun

func (s *ScenarioInfo) NewRun(iteration int) *Run

NewRun creates a new run.

func (*ScenarioInfo) RegisterDefaultSearchAttributes

func (s *ScenarioInfo) RegisterDefaultSearchAttributes(ctx context.Context) error

func (*ScenarioInfo) ScenarioOptionBool

func (s *ScenarioInfo) ScenarioOptionBool(name string, defaultValue bool) bool

func (*ScenarioInfo) ScenarioOptionDuration

func (s *ScenarioInfo) ScenarioOptionDuration(name string, defaultValue time.Duration) time.Duration

func (*ScenarioInfo) ScenarioOptionFloat

func (s *ScenarioInfo) ScenarioOptionFloat(name string, defaultValue float64) float64

func (*ScenarioInfo) ScenarioOptionInt

func (s *ScenarioInfo) ScenarioOptionInt(name string, defaultValue int) int

func (*ScenarioInfo) ScenarioOptionString

func (s *ScenarioInfo) ScenarioOptionString(name string, defaultValue string) string

type SleepActivityConfig

type SleepActivityConfig struct {
	// Distribution of how many sleep activities to run per iteration. Required.
	Count *DistributionField[int64] `json:"count"`

	// Map of groups to their configuration. Required.
	Groups map[string]SleepActivityGroupConfig `json:"groups"`
}

SleepActivityConfig defines the configuration for sleep activities with flexible distribution support

func ParseAndValidateSleepActivityConfig

func ParseAndValidateSleepActivityConfig(jsonStr string, requireCount bool) (*SleepActivityConfig, error)

func (*SleepActivityConfig) Sample

Sample generates a list of SleepActivityInput instances based on the SleepActivityConfig.

type SleepActivityGroupConfig

type SleepActivityGroupConfig struct {
	// Weight for this group, used for sampling. Defaults to 1.
	Weight int `json:"weight"`

	// Distribution for sleep duration within this group. Required.
	SleepDuration *DistributionField[time.Duration] `json:"sleepDuration"`

	// Distribution for priority keys within this group. Optional.
	PriorityKeys *DistributionField[int64] `json:"priorityKeys"`

	// Distribution for fairness keys within this group. Optional.
	FairnessKeys *DistributionField[int64] `json:"fairnessKeys"`

	// Distribution for fairness weight within this group. Optional.
	FairnessWeight *DistributionField[float32] `json:"fairnessWeight"`
}

SleepActivityGroupConfig defines a group configuration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL