Documentation
¶
Index ¶
- Constants
- func ExportWorkflowHistories(ctx context.Context, info ScenarioInfo) error
- func GetScenarios() map[string]*Scenario
- func InitSearchAttribute(ctx context.Context, info ScenarioInfo, attributeName string) error
- func MinVisibilityCountEventually(ctx context.Context, info ScenarioInfo, ...) error
- func MustRegisterScenario(scenario Scenario)
- func TaskQueueForRun(runID string) string
- func VerifyNoFailedWorkflows(ctx context.Context, info ScenarioInfo, searchAttribute, runID string) error
- type Configurable
- type DistributionField
- type Executor
- type ExecutorFunc
- type ExportOptions
- type FileOrArgs
- type FuzzExecutor
- type GenericExecutor
- type HasDefaultConfiguration
- type KitchenSinkExecutor
- type KitchenSinkWorkflowOptions
- type Resumable
- type Run
- func (r *Run) DefaultKitchenSinkWorkflowOptions() KitchenSinkWorkflowOptions
- func (r *Run) DefaultStartWorkflowOptions() client.StartWorkflowOptions
- func (r *Run) ExecuteAnyWorkflow(ctx context.Context, options client.StartWorkflowOptions, workflow any, ...) error
- func (r *Run) ExecuteKitchenSinkWorkflow(ctx context.Context, options *KitchenSinkWorkflowOptions) error
- func (r *Run) ShouldRetry(err error) (time.Duration, bool)
- func (r *Run) TaskQueue() string
- type RunConfiguration
- type Scenario
- type ScenarioInfo
- func (s *ScenarioInfo) NewRun(iteration int) *Run
- func (s *ScenarioInfo) RegisterDefaultSearchAttributes(ctx context.Context) error
- func (s *ScenarioInfo) ScenarioOptionBool(name string, defaultValue bool) bool
- func (s *ScenarioInfo) ScenarioOptionDuration(name string, defaultValue time.Duration) time.Duration
- func (s *ScenarioInfo) ScenarioOptionFloat(name string, defaultValue float64) float64
- func (s *ScenarioInfo) ScenarioOptionInt(name string, defaultValue int) int
- func (s *ScenarioInfo) ScenarioOptionString(name string, defaultValue string) string
- type SleepActivityConfig
- type SleepActivityGroupConfig
Constants ¶
const BaseIterationRetryBackoff = 1 * time.Second
const DefaultIterations = 10
const DefaultMaxConcurrentIterations = 10
const DefaultMaxIterationAttempts = 1
const MaxIterationRetryBackoff = 60 * time.Second
const OmesExecutionIDSearchAttribute = "OmesExecutionID"
Variables ¶
This section is empty.
Functions ¶
func ExportWorkflowHistories ¶
func ExportWorkflowHistories(ctx context.Context, info ScenarioInfo) error
ExportWorkflowHistories exports workflow histories based on filter settings.
func GetScenarios ¶
GetScenarios gets a copy of registered scenarios
func InitSearchAttribute ¶
func InitSearchAttribute( ctx context.Context, info ScenarioInfo, attributeName string, ) error
InitSearchAttribute ensures that a search attribute is defined in the namespace. It creates the search attribute if it doesn't exist, or verifies it exists if it does. This implementation matches the throughput_stress initSearchAttribute behavior.
func MinVisibilityCountEventually ¶
func MinVisibilityCountEventually( ctx context.Context, info ScenarioInfo, request *workflowservice.CountWorkflowExecutionsRequest, minCount int, waitAtMost time.Duration, ) error
func MustRegisterScenario ¶
func MustRegisterScenario(scenario Scenario)
MustRegisterScenario registers a scenario in the global static registry. Panics if registration fails. The file name of the caller is used as the scenario name.
func TaskQueueForRun ¶
TaskQueueForRun returns the task queue name for the given run ID.
func VerifyNoFailedWorkflows ¶
func VerifyNoFailedWorkflows(ctx context.Context, info ScenarioInfo, searchAttribute, runID string) error
VerifyNoFailedWorkflows verifies that there are no failed or terminated workflows for the given search attribute.
Types ¶
type Configurable ¶
type Configurable interface {
// Configure the executor with the given scenario info.
//
// Call this method if you want to ensure that all required configuration parameters
// are present and valid without actually running the executor.
Configure(ScenarioInfo) error
}
Optional interface that can be implemented by an Executor to make it configurable.
type DistributionField ¶
type DistributionField[T distValueType] struct {
// contains filtered or unexported fields
}
DistributionField is a generic wrapper for any distribution that can be serialized to/from JSON.
Supported distribution types:
"fixed" - Fixed distribution that always returns the same value. Example: {"type": "fixed", "value": "500"}
"discrete" - Discrete distribution with weighted values. Example: {"type": "discrete", "weights": {"100": 1, "200": 2, "300": 3}}
"uniform" - Uniform distribution. Example: {"type": "uniform", "min": "100", "max": "1000"}
"zipf" - Zipf distribution. Example: {"type": "zipf", "s": 2.0, "v": 1.0, "n": 100}
"normal" - Normal distribution. Example: {"type": "normal", "mean": "500", "stdDev": "100", "min": "0", "max": "1000"}
func NewDiscreteDistribution ¶
func NewDiscreteDistribution[T distValueType](weights map[T]int) DistributionField[T]
func NewFixedDistribution ¶
func NewFixedDistribution[T distValueType](value T) DistributionField[T]
func (DistributionField[T]) MarshalJSON ¶
func (df DistributionField[T]) MarshalJSON() ([]byte, error)
NOTE: each distribution's marshaller is responsible for setting the "type" field.
func (*DistributionField[T]) UnmarshalJSON ¶
func (df *DistributionField[T]) UnmarshalJSON(data []byte) error
type Executor ¶
type Executor interface {
// Run the scenario
Run(context.Context, ScenarioInfo) error
}
Executor for a scenario.
type ExecutorFunc ¶
type ExecutorFunc func(context.Context, ScenarioInfo) error
ExecutorFunc is an Executor implementation for a function
func (ExecutorFunc) Run ¶
func (e ExecutorFunc) Run(ctx context.Context, info ScenarioInfo) error
Run implements [Executor.Run].
type ExportOptions ¶
type ExportOptions struct {
// Directory to export histories (empty = disabled)
ExportHistoriesDir string
// Status filter: "failed", "terminated", "failed,terminated", "all"
ExportHistoriesFilter string
}
ExportOptions contains configuration for exporting scenario data.
type FileOrArgs ¶
type FuzzExecutor ¶
type FuzzExecutor struct {
// Must be specified, called once on startup, and determines how TestInputs will be used for
// iterations of the scenario. If a file is specified, it will be loaded and used as the input
// for every iteration. If generator args are specified, it will be invoked once per iteration
// and those inputs will be saved and then fed out to each iteration.
InitInputs func(context.Context, ScenarioInfo) FileOrArgs
DefaultConfiguration RunConfiguration
}
func (FuzzExecutor) GetDefaultConfiguration ¶
func (k FuzzExecutor) GetDefaultConfiguration() RunConfiguration
func (FuzzExecutor) Run ¶
func (k FuzzExecutor) Run(ctx context.Context, info ScenarioInfo) error
type GenericExecutor ¶
type GenericExecutor struct {
// Function to execute a single iteration of this scenario
Execute func(context.Context, *Run) error
}
func (*GenericExecutor) Run ¶
func (g *GenericExecutor) Run(ctx context.Context, info ScenarioInfo) error
type HasDefaultConfiguration ¶
type HasDefaultConfiguration interface {
GetDefaultConfiguration() RunConfiguration
}
HasDefaultConfiguration is an interface executors can implement to show their default configuration.
type KitchenSinkExecutor ¶
type KitchenSinkExecutor struct {
TestInput *kitchensink.TestInput
// Called once on start
PrepareTestInput func(context.Context, ScenarioInfo, *kitchensink.TestInput) error
// Called for each iteration. TestInput is copied entirely into KitchenSinkWorkflowOptions on
// each iteration.
UpdateWorkflowOptions func(context.Context, *Run, *KitchenSinkWorkflowOptions) error
}
func (KitchenSinkExecutor) Run ¶
func (k KitchenSinkExecutor) Run(ctx context.Context, info ScenarioInfo) error
type KitchenSinkWorkflowOptions ¶
type KitchenSinkWorkflowOptions struct {
Params *kitchensink.TestInput
StartOptions client.StartWorkflowOptions
}
type Resumable ¶
type Resumable interface {
// LoadState loads a snapshot into the executor's internal state.
//
// Implementations should pass a reference to a state variable to the loader function and assign to their internal state.
// Callers should call this function before invoking the executor's Run method.
LoadState(loader func(any) error) error
// Snapshot returns a snapshot of the executor's internal state. The returned value must be serializable.
//
// The serialization format should be supported by the caller of this function.
// Callers may call this function periodically to get a snapshot of the executor's state.
Snapshot() any
}
Optional interface that can be implemented by an Executor to allow it to be resumable.
type Run ¶
type Run struct {
// Do not mutate this, this is shared across the entire scenario
*ScenarioInfo
// Each run should have a unique iteration.
Iteration int
Logger *zap.SugaredLogger
// contains filtered or unexported fields
}
Run represents an individual scenario run (many may be in a single instance (of possibly many) of a scenario).
func (*Run) DefaultKitchenSinkWorkflowOptions ¶
func (r *Run) DefaultKitchenSinkWorkflowOptions() KitchenSinkWorkflowOptions
DefaultKitchenSinkWorkflowOptions gets the default kitchen sink workflow info.
func (*Run) DefaultStartWorkflowOptions ¶
func (r *Run) DefaultStartWorkflowOptions() client.StartWorkflowOptions
DefaultStartWorkflowOptions gets default start workflow info.
func (*Run) ExecuteAnyWorkflow ¶
func (r *Run) ExecuteAnyWorkflow(ctx context.Context, options client.StartWorkflowOptions, workflow any, valuePtr any, args ...any) error
ExecuteAnyWorkflow wraps calls to the client executing workflows to include some logging, returning an error if the execution fails.
func (*Run) ExecuteKitchenSinkWorkflow ¶
func (r *Run) ExecuteKitchenSinkWorkflow(ctx context.Context, options *KitchenSinkWorkflowOptions) error
ExecuteKitchenSinkWorkflow starts the generic "kitchen sink" workflow and waits for its completion ignoring its result. Concurrently it will perform any client actions specified in kitchensink.TestInput.ClientSequence
func (*Run) ShouldRetry ¶
ShouldRetry determines if another attempt should be made. It returns the backoff duration to wait before retrying and a boolean indicating whether a retry should occur.
type RunConfiguration ¶
type RunConfiguration struct {
// Number of iterations to run of this scenario (mutually exclusive with Duration).
Iterations int
// StartFromIteration is the iteration to start from when resuming a run.
// This is used to skip iterations that have already been run.
// Default is zero. If Iterations is set, too, must be less than or equal to Iterations.
StartFromIteration int
// MaxIterationAttempts is the maximum number of attempts to run the scenario.
// Default (and minimum) is 1.
MaxIterationAttempts int
// Duration limit of this scenario (mutually exclusive with Iterations). If neither iterations
// nor duration is set, default is DefaultIterations. When the Duration is elapsed, no new
// iterations will be started, but we will wait for any currently running iterations to
// complete.
Duration time.Duration
// Maximum number of instances of the Execute method to run concurrently.
// Default is DefaultMaxConcurrent.
MaxConcurrent int
// MaxIterationsPerSecond is the maximum number of iterations to run per second.
// Default is zero, meaning unlimited.
MaxIterationsPerSecond float64
// Timeout is the maximum amount of time we'll wait for the scenario to finish running.
// If the timeout is hit any pending executions will be cancelled and the scenario will exit
// with an error. The default is unlimited.
Timeout time.Duration
// Do not register the default search attributes used by scenarios. If the SAs are not registered
// by the run, they must be registered by some other method. This is needed because cloud cells
// cannot use the SDK to register SAs, instead the SAs must be registered through the control plane.
// Default is false.
DoNotRegisterSearchAttributes bool
// IgnoreAlreadyStarted, if set, will not error when a workflow with the same ID already exists.
// Default is false.
IgnoreAlreadyStarted bool
// OnCompletion, if set, is invoked after each successful iteration completes.
OnCompletion func(context.Context, *Run)
// HandleExecuteError, if set, is called when Execute returns an error, allowing transformation of errors.
HandleExecuteError func(context.Context, *Run, error) error
}
func (*RunConfiguration) ApplyDefaults ¶
func (r *RunConfiguration) ApplyDefaults()
func (RunConfiguration) Validate ¶
func (r RunConfiguration) Validate() error
type Scenario ¶
func GetScenario ¶
GetScenario gets a scenario by name from the global static registry.
type ScenarioInfo ¶
type ScenarioInfo struct {
// Name of the scenario (inferred from the file name)
ScenarioName string
// Run ID of the current scenario run, used to generate a unique task queue
// and workflow ID prefix. This is a single value for the whole scenario, and
// not a Workflow RunId.
RunID string
// ExecutionID is a randomly generated ID that uniquely identifies this particular
// execution of the scenario. Combined with RunID, it ensures no two executions collide.
ExecutionID string
// Metrics component for registering new metrics.
MetricsHandler client.MetricsHandler
// A zap logger.
Logger *zap.SugaredLogger
// A Temporal client.
Client client.Client
// Configuration info passed by user if any.
Configuration RunConfiguration
// ScenarioOptions are info passed from the command line. Do not mutate these.
ScenarioOptions map[string]string
// The namespace that was used when connecting the client.
Namespace string
// Path to the root of the omes dir
RootPath string
// ExportOptions contains export-related configuration
ExportOptions ExportOptions
}
ScenarioInfo contains information about the scenario under execution.
func (*ScenarioInfo) NewRun ¶
func (s *ScenarioInfo) NewRun(iteration int) *Run
NewRun creates a new run.
func (*ScenarioInfo) RegisterDefaultSearchAttributes ¶
func (s *ScenarioInfo) RegisterDefaultSearchAttributes(ctx context.Context) error
func (*ScenarioInfo) ScenarioOptionBool ¶
func (s *ScenarioInfo) ScenarioOptionBool(name string, defaultValue bool) bool
func (*ScenarioInfo) ScenarioOptionDuration ¶
func (*ScenarioInfo) ScenarioOptionFloat ¶
func (s *ScenarioInfo) ScenarioOptionFloat(name string, defaultValue float64) float64
func (*ScenarioInfo) ScenarioOptionInt ¶
func (s *ScenarioInfo) ScenarioOptionInt(name string, defaultValue int) int
func (*ScenarioInfo) ScenarioOptionString ¶
func (s *ScenarioInfo) ScenarioOptionString(name string, defaultValue string) string
type SleepActivityConfig ¶
type SleepActivityConfig struct {
// Distribution of how many sleep activities to run per iteration. Required.
Count *DistributionField[int64] `json:"count"`
// Map of groups to their configuration. Required.
Groups map[string]SleepActivityGroupConfig `json:"groups"`
}
SleepActivityConfig defines the configuration for sleep activities with flexible distribution support
func ParseAndValidateSleepActivityConfig ¶
func ParseAndValidateSleepActivityConfig(jsonStr string, requireCount bool) (*SleepActivityConfig, error)
func (*SleepActivityConfig) Sample ¶
func (config *SleepActivityConfig) Sample(rng *rand.Rand) []*kitchensink.ExecuteActivityAction
Sample generates a list of SleepActivityInput instances based on the SleepActivityConfig.
type SleepActivityGroupConfig ¶
type SleepActivityGroupConfig struct {
// Weight for this group, used for sampling. Defaults to 1.
Weight int `json:"weight"`
// Distribution for sleep duration within this group. Required.
SleepDuration *DistributionField[time.Duration] `json:"sleepDuration"`
// Distribution for priority keys within this group. Optional.
PriorityKeys *DistributionField[int64] `json:"priorityKeys"`
// Distribution for fairness keys within this group. Optional.
FairnessKeys *DistributionField[int64] `json:"fairnessKeys"`
// Distribution for fairness weight within this group. Optional.
FairnessWeight *DistributionField[float32] `json:"fairnessWeight"`
}
SleepActivityGroupConfig defines a group configuration.