streamingpromql

package
v1.3.1-0...-c6d9dcd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2026 License: AGPL-3.0 Imports: 45 Imported by: 0

README

Mimir Query Engine

This file contains a brief overview of the internals of the Mimir Query Engine, aka MQE.

For an introduction to the engine itself and the problems it tries to solve, check out this PromCon 2023 talk.

The goal of the engine is to allow evaluating queries over millions of series in a safe, performant and cost-effective way. To allow this, the engine aims to ensure that peak memory consumption of queriers is not proportional to the number of series selected. This will make it safe for operators to loosen the various query-related limits without risking the stability of their Mimir cluster or needing to devote enormous amounts of compute resources to queriers.

The key way the engine achieves this is by not loading all the input series into memory at once, and instead streaming them into memory when needed.

For example, let's say we're evaluating the query sum by (environment) (some_metric{cluster="cluster-1"}).

Prometheus' PromQL engine will first load all samples for all series selected by some_metric{cluster="cluster-1"} into memory. It will then compute the sum for each unique value of environment. At its peak, Prometheus' PromQL engine will hold all samples for all input series (from some_metric{cluster="cluster-1"}) and all samples for all output series in memory at once.

MQE here will instead execute the selector some_metric{cluster="cluster-1"} and gather the labels of all series returned. With these labels, it will then compute all the possible output series for the sum by (environment) operation (ie. one output series per unique value of environment). Having computed the output series, it will then begin reading series from the selector, one at a time, and update the running total for the appropriate output series. At its peak, MQE in this example will hold all samples for one input series and all samples for all output series in memory at once[^1], a significant reduction compared to Prometheus' PromQL engine, particularly when the selector selects many series.

This idea of streaming can be applied to multiple levels as well. Imagine we're evaluating the query max(sum by (environment) (some_metric{cluster="cluster-1"})). In MQE, once the result of each group series produced by sum is complete, it is passed to max, which can update its running maximum seen so far across all groups. At its peak, MQE will hold all samples for one input series, all samples for all incomplete sum group series, and the single incomplete max output series in memory at once.

Internals

Within MQE, a query is represented by a set of linked operators (one for each operation) that together form the query plan.

For example, the max(sum by (environment) (some_metric{cluster="cluster-1"})) example from before would have a query plan made up of three operators:

  • The instant vector selector operator (some_metric{cluster="cluster-1"})
  • The sum aggregation operator (sum by (environment) (...)), which consumes series from the instant vector selector operator
  • The max aggregation operator (max (...)), which consumes series from the sum aggregation operator

Visually, the plan looks like this:

flowchart TB
    IVS["`**instant vector selector**
    some_metric#123;cluster=#quot;cluster-1#quot;#125;`"]
    sum["`**sum aggregation**
    sum by (environment) (...)`"]
    max["`**max aggregation**
    max (...)`"]
    output((output))
    IVS --> sum
    sum --> max
    max --> output

Each of these operators satisfies the InstantVectorOperator interface, defined here. The two key methods of this interface are SeriesMetadata() and NextSeries():

SeriesMetadata() returns the list of all series' labels that will be returned by the operator[^2]. In our example, the instant vector selector operator would return all the matching some_metric series, and the sum aggregation operator would return one series for each unique value of environment.

NextSeries() is then called by the consuming operator to read each series' data, one series at a time. In our example, the sum aggregation operator would call NextSeries() on the instant vector selector operator to get the first series' data, then again to get the second series' data and so on.

Elaborating on the example from before, the overall query would proceed like this, assuming the request is received over HTTP:

  1. query HTTP API handler calls Engine.NewInstantQuery() or Engine.NewRangeQuery() as appropriate (source)
    1. engine parses PromQL expression using Prometheus' PromQL parser, producing an abstract syntax tree (AST) (source)
    2. engine converts AST produced by PromQL parser to query plan (source)
    3. engine returns created Query instance
  2. query HTTP API handler calls Query.Exec()
    1. Query.Exec() calls SeriesMetadata() on max aggregation operator
      1. max aggregation operator calls SeriesMetadata() on sum aggregation operator
        1. sum aggregation operator calls SeriesMetadata() on instant vector selector operator
          • instant vector selector operator issues Select() call, which retrieves labels from ingesters and store-gateways
        2. sum aggregation operator computes output series (one per unique value of environment) based on input series from instant vector selector
      2. max aggregation operator computes output series based on input series from sum aggregation operator
        • in this case, there's just one output series, given no grouping is being performed
    2. root of the query calls NextSeries() on max aggregation operator until all series have been returned
      1. max aggregation operator calls NextSeries() on sum aggregation operator
        1. sum aggregation operator calls NextSeries() on instant vector selector operator
          • instant vector selector returns samples for next series
        2. sum aggregation operator updates its running totals for the relevant output series
        3. if all input series have now been seen for the output series just updated, sum aggregation operator returns that output series and removes it from its internal state
        4. otherwise, it calls NextSeries() again and repeats
      2. max aggregation operator updates its running maximum based on the series returned
      3. if all input series have been seen, max aggregation operator returns
      4. otherwise, it calls NextSeries() again and repeats
  3. query HTTP API handler converts returned result to wire format (either JSON or Protobuf) and sends to caller
  4. query HTTP API handler calls Query.Close() to release remaining resources

[^1]: This isn't strictly correct, as chunks streaming will buffer chunks for some series in memory as they're received over the network, and it ignores the initial memory consumption caused by the non-streaming calls to SeriesMetadata(). But this applies equally to both engines when used in Mimir.

[^2]: This isn't done in a streaming fashion: all series' labels are loaded into memory at once. In a future iteration of the engine, SeriesMetadata() could be made streaming as well, but this is out of scope for now.

Implementation notes

Thread safety

Operators are not expected to be thread-safe: the engine currently evaluates queries from a single goroutine.

Native histograms and memory pooling

MQE makes extensive use of memory pooling to reduce GC pressure, including for slices that hold *histogram.FloatHistogram pointers.

Slices of promql.HPoint returned by types.HPointSlicePool are not cleared when they are returned. This allows the FloatHistogram instances to be reused for other series or time steps. For example, when filling a HPointRingBuffer, range vector selectors will reuse FloatHistogram instances already present in the HPoint slice that backs the ring buffer, rather than unconditionally creating a new FloatHistogram instance.

The implication of this is that anywhere that returns a promql.HPoint slice s to types.HPointSlicePool must remove references in s to any FloatHistograms retained after s is returned. For example, binary operations that act as a filter retain the FloatHistograms that satisfy the filter in a new, smaller slice, and return the original unfiltered slice s to the pool.

The simplest way to do this is to set the H field on promql.HPoint to nil. If all points in s are being retained, then calling clear(s) is also sufficient.

If this is not done, query results may become corrupted due to multiple queries simultaneously modifying the same FloatHistogram instance. This can also manifest as panics while interacting with FloatHistograms.

The same problem does not apply to *histogram.FloatHistogram slices returned by types.HistogramSlicePool. Slices from this pool are used only by parts of MQE that do not benefit from reusing FloatHistogram instances, and so types.HistogramSlicePool clears all slices returned for you.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DetermineLookbackDelta

func DetermineLookbackDelta(opts promql.EngineOpts) time.Duration

func TestMain

func TestMain(m *testing.M)

Types

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

func NewEngine

func NewEngine(opts EngineOpts, metrics *stats.QueryMetrics, planner *QueryPlanner) (*Engine, error)

func NewEngineWithCache

func NewEngineWithCache(opts EngineOpts, metrics *stats.QueryMetrics, planner *QueryPlanner, intermediateCache *cache.CacheFactory) (*Engine, error)

func (*Engine) NewEvaluator

func (e *Engine) NewEvaluator(ctx context.Context, queryable storage.Queryable, opts promql.QueryOpts, params *planning.QueryParameters, nodeRequests []NodeEvaluationRequest) (*Evaluator, error)

func (*Engine) NewInstantQuery

func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error)

func (*Engine) NewRangeQuery

func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error)

func (*Engine) RegisterNodeMaterializer

func (e *Engine) RegisterNodeMaterializer(nodeType planning.NodeType, materializer planning.NodeMaterializer) error

type EngineOpts

type EngineOpts struct {
	CommonOpts promql.EngineOpts `yaml:"-"`

	ActiveQueryTracker QueryTracker `yaml:"-"`
	Logger             log.Logger   `yaml:"-"`

	// When operating in pedantic mode, we panic if memory consumption is > 0 after Query.Close()
	// (indicating something was not returned to a pool).
	// Should only be used in tests.
	Pedantic bool `yaml:"-"`

	// Prometheus' engine evaluates all selectors (ie. calls Querier.Select()) before evaluating any part of the query.
	// We rely on this behavior in query-frontends when evaluating shardable queries so that all selectors are evaluated in parallel.
	// When sharding is just another optimization pass, we'll be able to trigger this eager loading from the sharding operator,
	// but for now, we use this option to change the behavior of selectors.
	EagerLoadSelectors bool `yaml:"-"`

	Limits QueryLimitsProvider `yaml:"-"`

	EnablePruneToggles                   bool `yaml:"enable_prune_toggles" category:"experimental"`
	EnableCommonSubexpressionElimination bool `yaml:"enable_common_subexpression_elimination" category:"experimental"`
	EnableSubsetSelectorElimination      bool `yaml:"enable_subset_selector_elimination" category:"experimental"`
	EnableNarrowBinarySelectors          bool `yaml:"enable_narrow_binary_selectors" category:"experimental"`
	EnableEliminateDeduplicateAndMerge   bool `yaml:"enable_eliminate_deduplicate_and_merge" category:"experimental"`
	EnableReduceMatchers                 bool `yaml:"enable_reduce_matchers" category:"experimental"`
	EnableProjectionPushdown             bool `yaml:"enable_projection_pushdown" category:"experimental"`
	EnableMultiAggregation               bool `yaml:"enable_multi_aggregation" category:"experimental"`

	RangeVectorSplitting RangeVectorSplittingConfig `yaml:"range_vector_splitting" category:"experimental"`
}

func NewTestEngineOpts

func NewTestEngineOpts() EngineOpts

func (*EngineOpts) RegisterFlags

func (o *EngineOpts) RegisterFlags(f *flag.FlagSet)

func (*EngineOpts) Validate

func (o *EngineOpts) Validate() error

type ErrAnchoredIncompatibleFunction

type ErrAnchoredIncompatibleFunction struct {
	// contains filtered or unexported fields
}

func (ErrAnchoredIncompatibleFunction) Error

type ErrSmoothedIncompatibleFunction

type ErrSmoothedIncompatibleFunction struct {
	// contains filtered or unexported fields
}

func (ErrSmoothedIncompatibleFunction) Error

type EvaluationObserver

type EvaluationObserver interface {
	// SeriesMetadataEvaluated notifies this observer when series metadata has been evaluated.
	// Implementations of this method are responsible for returning the series slice to the pool when it is no longer needed.
	// Implementations of this method may mutate the series slice before returning it to the pool.
	SeriesMetadataEvaluated(ctx context.Context, evaluator *Evaluator, node planning.Node, series []types.SeriesMetadata) error

	// InstantVectorSeriesDataEvaluated notifies this observer when samples for an instant vector series have been evaluated.
	// Implementations of this method are responsible for returning seriesData to the pool when it is no longer needed.
	// Implementations of this method may mutate seriesData before returning it to the pool.
	InstantVectorSeriesDataEvaluated(ctx context.Context, evaluator *Evaluator, node planning.Node, seriesIndex int, seriesCount int, seriesData types.InstantVectorSeriesData) error

	// RangeVectorStepSamplesEvaluated notifies this observer when samples for a range vector step have been evaluated.
	// Implementations of this method must not mutate stepData, and should copy any data they wish to retain from stepData before returning.
	RangeVectorStepSamplesEvaluated(ctx context.Context, evaluator *Evaluator, node planning.Node, seriesIndex int, stepIndex int, stepData *types.RangeVectorStepData) error

	// ScalarEvaluated notifies this observer when a scalar has been evaluated.
	// Implementations of this method are responsible for returning data to the pool when it is no longer needed.
	// Implementations of this method may mutate data before returning it to the pool.
	ScalarEvaluated(ctx context.Context, evaluator *Evaluator, node planning.Node, data types.ScalarData) error

	// StringEvaluated notifies this observer when a string has been evaluated.
	StringEvaluated(ctx context.Context, evaluator *Evaluator, node planning.Node, data string) error

	// EvaluationCompleted notifies this observer when evaluation is complete.
	EvaluationCompleted(ctx context.Context, evaluator *Evaluator, annotations *annotations.Annotations, stats *types.QueryStats) error
}

type Evaluator

type Evaluator struct {
	MemoryConsumptionTracker *limiter.MemoryConsumptionTracker
	// contains filtered or unexported fields
}

func NewEvaluator

func NewEvaluator(nodeRequests []NodeEvaluationRequest, params *planning.OperatorParameters, engine *Engine, originalExpression string) (*Evaluator, error)

func (*Evaluator) Cancel

func (e *Evaluator) Cancel()

func (*Evaluator) Close

func (e *Evaluator) Close()

func (*Evaluator) Evaluate

func (e *Evaluator) Evaluate(ctx context.Context, observer EvaluationObserver) (err error)

Evaluate evaluates the query.

Evaluate will always call observer.EvaluationCompleted before returning nil. It may return a non-nil error after calling observer.EvaluationCompleted if observer.EvaluationCompleted returned a non-nil error.

type NodeEvaluationRequest

type NodeEvaluationRequest struct {
	Node      planning.Node
	TimeRange types.QueryTimeRange
	// contains filtered or unexported fields
}

type NoopPlanningObserver

type NoopPlanningObserver struct{}

func (NoopPlanningObserver) OnASTStageComplete

func (n NoopPlanningObserver) OnASTStageComplete(string, parser.Expr, time.Duration) error

func (NoopPlanningObserver) OnAllASTStagesComplete

func (n NoopPlanningObserver) OnAllASTStagesComplete(parser.Expr) error

func (NoopPlanningObserver) OnAllPlanningStagesComplete

func (n NoopPlanningObserver) OnAllPlanningStagesComplete(*planning.QueryPlan) error

func (NoopPlanningObserver) OnPlanningStageComplete

func (n NoopPlanningObserver) OnPlanningStageComplete(string, *planning.QueryPlan, time.Duration) error

type NoopQueryTracker

type NoopQueryTracker struct{}

func (*NoopQueryTracker) Close

func (n *NoopQueryTracker) Close() error

func (*NoopQueryTracker) Delete

func (n *NoopQueryTracker) Delete(_ int)

func (*NoopQueryTracker) GetMaxConcurrent

func (n *NoopQueryTracker) GetMaxConcurrent() int

func (*NoopQueryTracker) InsertWithDetails

func (n *NoopQueryTracker) InsertWithDetails(ctx context.Context, query string, stage string, includeTimeRange bool, timeRange types.QueryTimeRange) (int, error)

type PlanningObserver

type PlanningObserver interface {
	OnASTStageComplete(stageName string, updatedExpr parser.Expr, duration time.Duration) error
	OnAllASTStagesComplete(finalExpr parser.Expr) error
	OnPlanningStageComplete(stageName string, updatedPlan *planning.QueryPlan, duration time.Duration) error
	OnAllPlanningStagesComplete(finalPlan *planning.QueryPlan) error
}

type Query

type Query struct {
	// contains filtered or unexported fields
}

Query represents a top-level query. It acts as a bridge from the querying interface Prometheus expects into how MQE operates.

func (*Query) Cancel

func (q *Query) Cancel()

func (*Query) Close

func (q *Query) Close()

func (*Query) EvaluationCompleted

func (q *Query) EvaluationCompleted(_ context.Context, _ *Evaluator, annotations *annotations.Annotations, stats *types.QueryStats) error

EvaluationCompleted implements the EvaluationObserver interface.

func (*Query) Exec

func (q *Query) Exec(ctx context.Context) (res *promql.Result)

func (*Query) InstantVectorSeriesDataEvaluated

func (q *Query) InstantVectorSeriesDataEvaluated(_ context.Context, _ *Evaluator, _ planning.Node, seriesIndex int, _ int, seriesData types.InstantVectorSeriesData) error

InstantVectorSeriesDataEvaluated implements the EvaluationObserver interface.

func (*Query) RangeVectorStepSamplesEvaluated

func (q *Query) RangeVectorStepSamplesEvaluated(_ context.Context, _ *Evaluator, _ planning.Node, seriesIndex int, stepIndex int, stepData *types.RangeVectorStepData) error

RangeVectorStepSamplesEvaluated implements the EvaluationObserver interface.

func (*Query) ScalarEvaluated

func (q *Query) ScalarEvaluated(_ context.Context, _ *Evaluator, _ planning.Node, data types.ScalarData) error

ScalarEvaluated implements the EvaluationObserver interface.

func (*Query) SeriesMetadataEvaluated

func (q *Query) SeriesMetadataEvaluated(_ context.Context, _ *Evaluator, _ planning.Node, series []types.SeriesMetadata) error

SeriesMetadataEvaluated implements the EvaluationObserver interface.

func (*Query) Statement

func (q *Query) Statement() parser.Statement

func (*Query) Stats

func (q *Query) Stats() *promstats.Statistics

func (*Query) String

func (q *Query) String() string

func (*Query) StringEvaluated

func (q *Query) StringEvaluated(_ context.Context, _ *Evaluator, _ planning.Node, data string) error

StringEvaluated implements the EvaluationObserver interface.

type QueryLimitsProvider

type QueryLimitsProvider interface {
	// GetMaxEstimatedMemoryConsumptionPerQuery returns the maximum estimated memory allowed to be consumed by a query in bytes, or 0 to disable the limit.
	GetMaxEstimatedMemoryConsumptionPerQuery(ctx context.Context) (uint64, error)
	// GetEnableDelayedNameRemoval indicates if the experimental feature for delayed name removal should be enabled.
	GetEnableDelayedNameRemoval(ctx context.Context) (bool, error)
	// GetMaxOutOfOrderTimeWindow returns the out-of-order time window for the tenant(s) in the context.
	GetMaxOutOfOrderTimeWindow(ctx context.Context) (time.Duration, error)
	// GetMinResultsCacheTTL returns the TTL for cached results for the tenant(s) in the context.
	GetMinResultsCacheTTL(ctx context.Context) (time.Duration, error)
}

type QueryPlanVersionProvider

type QueryPlanVersionProvider interface {
	GetMaximumSupportedQueryPlanVersion(ctx context.Context) (planning.QueryPlanVersion, error)
}

func NewMaximumSupportedVersionQueryPlanVersionProvider

func NewMaximumSupportedVersionQueryPlanVersionProvider() QueryPlanVersionProvider

NewMaximumSupportedVersionQueryPlanVersionProvider returns a QueryPlanVersionProvider that always returns the maximum supported query plan version. This is intended to be used only in tests.

func NewStaticQueryPlanVersionProvider

func NewStaticQueryPlanVersionProvider(version planning.QueryPlanVersion) QueryPlanVersionProvider

NewStaticQueryPlanVersionProvider returns a QueryPlanVersionProvider that always returns the given version. This is intended to be used only in tests.

type QueryPlanner

type QueryPlanner struct {

	// Replaced during testing to ensure timing produces consistent results.
	TimeSince func(time.Time) time.Duration
	// contains filtered or unexported fields
}

func NewQueryPlanner

func NewQueryPlanner(opts EngineOpts, versionProvider QueryPlanVersionProvider) (*QueryPlanner, error)

func NewQueryPlannerWithTime

func NewQueryPlannerWithTime(opts EngineOpts, versionProvider QueryPlanVersionProvider, timeNow func() time.Time) (*QueryPlanner, error)

NewQueryPlannerWithTime is like NewQueryPlanner but uses the given time function. Useful for tests that need a fixed "now" for OOO window calculations).

func NewQueryPlannerWithoutOptimizationPasses

func NewQueryPlannerWithoutOptimizationPasses(opts EngineOpts, versionProvider QueryPlanVersionProvider) (*QueryPlanner, error)

NewQueryPlannerWithoutOptimizationPasses creates a new query planner without any optimization passes registered.

This is intended for use in tests only.

func (*QueryPlanner) NewQueryPlan

func (p *QueryPlanner) NewQueryPlan(ctx context.Context, qs string, timeRange types.QueryTimeRange, enableDelayedNameRemoval bool, observer PlanningObserver) (*planning.QueryPlan, error)

func (*QueryPlanner) ParseAndApplyASTOptimizationPasses

func (p *QueryPlanner) ParseAndApplyASTOptimizationPasses(ctx context.Context, qs string, timeRange types.QueryTimeRange, observer PlanningObserver) (parser.Expr, error)

ParseAndApplyASTOptimizationPasses runs the AST optimization passes on the input string and outputs an expression and any error encountered. This is separated into its own method to allow testing of AST optimization passes.

func (*QueryPlanner) RegisterASTOptimizationPass

func (p *QueryPlanner) RegisterASTOptimizationPass(o optimize.ASTOptimizationPass)

RegisterASTOptimizationPass registers an AST optimization pass used with this engine.

This method is not thread-safe and must not be called concurrently with any other method on this type.

func (*QueryPlanner) RegisterQueryPlanOptimizationPass

func (p *QueryPlanner) RegisterQueryPlanOptimizationPass(o optimize.QueryPlanOptimizationPass)

RegisterQueryPlanOptimizationPass registers a query plan optimization pass used with this engine.

This method is not thread-safe and must not be called concurrently with any other method on this type.

type QueryTracker

type QueryTracker interface {
	InsertWithDetails(ctx context.Context, query string, stage string, includeTimeRange bool, timeRange types.QueryTimeRange) (int, error)

	Delete(insertIndex int)
}

QueryTracker is like promql.QueryTracker, but includes more information about the query.

type RangeVectorSplittingConfig

type RangeVectorSplittingConfig struct {
	Enabled bool `yaml:"enabled" category:"experimental"`

	// SplitInterval is the time interval used for splitting.
	// Must be greater than 0. Defaults to 2 hours if not specified.
	SplitInterval time.Duration `yaml:"split_interval" category:"experimental"`

	// IntermediateResultsCache configures caching of intermediate results from split queries.
	// TODO: consider making the cache an optional part of query splitting. We might want to just do query splitting
	//  without caching (e.g. possibly if splitting is extended to range queries in the future, or if we add
	//  parallelisation and just want to use query splitting for that and not cache).
	IntermediateResultsCache cache.Config `yaml:"intermediate_results_cache" category:"experimental"`
}

RangeVectorSplittingConfig configures the splitting of functions over range vectors queries.

func (*RangeVectorSplittingConfig) RegisterFlags

func (c *RangeVectorSplittingConfig) RegisterFlags(f *flag.FlagSet)

func (*RangeVectorSplittingConfig) Validate

func (c *RangeVectorSplittingConfig) Validate() error

type StaticQueryLimitsProvider

type StaticQueryLimitsProvider struct {
	MaxEstimatedMemoryConsumptionPerQuery uint64
	EnableDelayedNameRemoval              bool
	MaxOutOfOrderTimeWindow               time.Duration
	MinResultsCacheTTL                    time.Duration
}

func NewStaticQueryLimitsProvider

func NewStaticQueryLimitsProvider() StaticQueryLimitsProvider

NewStaticQueryLimitsProvider returns a QueryLimitsProvider that always returns the provided limits. This should generally only be used in tests.

func (StaticQueryLimitsProvider) GetEnableDelayedNameRemoval

func (p StaticQueryLimitsProvider) GetEnableDelayedNameRemoval(_ context.Context) (bool, error)

func (StaticQueryLimitsProvider) GetMaxEstimatedMemoryConsumptionPerQuery

func (p StaticQueryLimitsProvider) GetMaxEstimatedMemoryConsumptionPerQuery(_ context.Context) (uint64, error)

func (StaticQueryLimitsProvider) GetMaxOutOfOrderTimeWindow

func (p StaticQueryLimitsProvider) GetMaxOutOfOrderTimeWindow(_ context.Context) (time.Duration, error)

func (StaticQueryLimitsProvider) GetMinResultsCacheTTL

func (p StaticQueryLimitsProvider) GetMinResultsCacheTTL(_ context.Context) (time.Duration, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL