utils

package
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2026 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultArenaSize is the default allocation size for in-memory indexes.
	DefaultArenaSize = int64(64 << 20)

	MaxNodeSize = int(unsafe.Sizeof(node{}))
)
View Source
const (
	// MaxLevelNum _
	MaxLevelNum = 7
	// DefaultValueThreshold _
	DefaultValueThreshold = 1024
)
View Source
const (
	ManifestFilename                  = "MANIFEST"
	ManifestRewriteFilename           = "REWRITEMANIFEST"
	ManifestDeletionsRewriteThreshold = 10000
	ManifestDeletionsRatio            = 10
	DefaultFileFlag                   = os.O_RDWR | os.O_CREATE | os.O_APPEND
	DefaultFileMode                   = 0666
	MaxValueLogSize                   = 10 << 20

	// MaxHeaderSize is the worst-case size for uvarint encoding.
	MaxHeaderSize            = 21
	VlogHeaderSize           = 0
	MaxVlogFileSize   uint32 = math.MaxUint32
	Mi                int64  = 1 << 20
	KVWriteChCapacity        = 1000
)

file

Variables

View Source
var (
	MagicText    = [4]byte{'N', 'O', 'K', 'V'}
	MagicVersion = uint32(1)
)

codec

View Source
var (
	// ErrValueLogSize is returned when opt.ValueLogFileSize option is not within the valid
	// range.
	ErrValueLogSize = errors.New("Invalid ValueLogFileSize, must be in range [1MB, 2GB)")

	// ErrKeyNotFound is returned when key isn't found on a txn.Get.
	ErrKeyNotFound = errors.New("Key not found")
	// ErrReWriteFailure reWrite failure
	ErrReWriteFailure = errors.New("reWrite failure")
	// ErrBadMagic bad magic
	ErrBadMagic = errors.New("bad magic")
	// ErrBadChecksum bad check sum
	ErrBadChecksum = errors.New("bad check sum")
	// ErrChecksumMismatch is returned at checksum mismatch.
	ErrChecksumMismatch = errors.New("checksum mismatch")

	ErrTruncate = errors.New("Do truncate")
	ErrStop     = errors.New("Stop")

	// ErrTxnTooBig is returned if too many writes are fit into a single transaction.
	ErrTxnTooBig      = errors.New("Txn is too big to fit into one request")
	ErrDeleteVlogFile = errors.New("Delete vlog file")

	// ErrConflict is returned when a transaction conflicts with another transaction. This can
	// happen if the read rows had been updated concurrently by another transaction.
	ErrConflict = errors.New("Transaction Conflict. Please retry")

	// ErrReadOnlyTxn is returned if an update function is called on a read-only transaction.
	ErrReadOnlyTxn = errors.New("No sets or deletes are allowed in a read-only transaction")

	// ErrDiscardedTxn is returned if a previously discarded transaction is re-used.
	ErrDiscardedTxn = errors.New("This transaction has been discarded. Create a new one")

	// ErrEmptyKey is returned if an empty key is passed on an update function.
	ErrEmptyKey = errors.New("Key cannot be empty")

	// ErrNoRewrite is returned if a call for value log GC doesn't result in a log file rewrite.
	ErrNoRewrite = errors.New(
		"Value log GC attempt didn't result in any cleanup")

	// ErrRejected is returned if a value log GC is called either while another GC is running, or
	// after DB::Close has been called.
	ErrRejected = errors.New("Value log GC request rejected")

	// ErrInvalidRequest is returned if the user request is invalid.
	ErrInvalidRequest = errors.New("Invalid request")

	// ErrManagedTxn is returned if the user tries to use an API which isn't
	// allowed due to external management of transactions, when using ManagedDB.
	ErrManagedTxn = errors.New(
		"Invalid API request. Not allowed to perform this action using ManagedDB")

	// ErrTruncateNeeded is returned when the value log gets corrupt, and requires truncation of
	// corrupt data to allow to run properly.
	ErrTruncateNeeded = errors.New(
		"Log truncate required to run DB. This might result in data loss")

	// ErrBlockedWrites is returned if the user called DropAll. During the process of dropping all
	// data
	ErrBlockedWrites = errors.New("Writes are blocked, possibly due to DropAll or Close")

	// ErrDBClosed is returned when a get operation is performed after closing the DB.
	ErrDBClosed = errors.New("DB Closed")

	ErrFillTables = errors.New("fill tables")

	// ErrHotKeyWriteThrottle indicates that a key exceeded the configured write hot-key limit.
	ErrHotKeyWriteThrottle = errors.New("hot key write throttled")

	// ErrMissingManifestOrWAL indicates WAL and manifest must be provided together for raftstore durability.
	ErrMissingManifestOrWAL = errors.New("raftstore: WAL and manifest must both be provided")

	// ErrPartialRecord indicates that a WAL record ended prematurely (typically due to EOF/corruption).
	ErrPartialRecord = errors.New("wal: partial record")
	// ErrEmptyRecord indicates that a WAL record header advertised zero payload length.
	ErrEmptyRecord = errors.New("wal: empty record")
)

ErrKeyNotFound indicates a missing key.

Functions

func AssertTrue

func AssertTrue(b bool)

AssertTrue asserts that b is true. Otherwise, it would log fatal.

func AssertTruef

func AssertTruef(b bool, format string, args ...any)

AssertTruef is AssertTrue with extra info.

func BloomBitsPerKey

func BloomBitsPerKey(numEntries int, fp float64) int

BloomBitsPerKey returns the bits per key required by bloomfilter based on the false positive rate.

func BloomInsert added in v0.6.1

func BloomInsert(filter []byte, h uint32)

BloomInsert mutates filter to include hash.

func BloomKForBitsPerKey added in v0.6.1

func BloomKForBitsPerKey(bitsPerKey int) uint8

BloomKForBitsPerKey maps bits-per-key to the number of bloom probes.

func BloomMayContain added in v0.6.1

func BloomMayContain(filter []byte, h uint32) bool

BloomMayContain checks if hash may exist in the encoded bloom filter.

func BuildEntry

func BuildEntry() *kv.Entry

BuildEntry constructs a random entry for tests.

func CalculateChecksum

func CalculateChecksum(data []byte) uint64

CalculateChecksum _

func Check

func Check(err error)

func CompareKeys

func CompareKeys(key1, key2 []byte) int

CompareKeys checks the key without timestamp and checks the timestamp if keyNoTs is same. a<timestamp> would be sorted higher than aa<timestamp> if we use bytes.compare All keys should have timestamp.

func CompareUserKeys added in v0.4.2

func CompareUserKeys(key1, key2 []byte) int

CompareUserKeys compares keys ignoring any internal timestamp suffix. It accepts either internal keys or raw user keys.

func CondPanic

func CondPanic(condition bool, err error)

CondPanic e

func CondPanicFunc added in v0.4.0

func CondPanicFunc(condition bool, errFn func() error)

CondPanicFunc defers error construction until the condition is true, avoiding allocations on the hot path.

func CreateSyncedFile

func CreateSyncedFile(fs vfs.FS, filename string, sync bool) (vfs.File, error)

CreateSyncedFile creates a new file (using O_EXCL), errors if it already existed.

func Err

func Err(err error) error

Err err

func FID

func FID(name string) uint64

FID parses the file ID from an sstable filename.

func FastRand

func FastRand() uint32

FastRand is a fast thread local random function.

func FileNameSSTable

func FileNameSSTable(dir string, id uint64) string

FileNameSSTable returns the SSTable filename for the given ID.

func Float64

func Float64() float64

func GetOrCreateInt added in v0.4.0

func GetOrCreateInt(name string) *expvar.Int

GetOrCreateInt is exported for reuse.

func Hash

func Hash(b []byte) uint32

Hash implements a hashing algorithm similar to the Murmur hash.

func Int63n

func Int63n(n int64) int64

func LoadIDMap

func LoadIDMap(fs vfs.FS, dir string) map[uint64]struct{}

LoadIDMap Get the id of all sst files in the current folder

func Panic

func Panic(err error)

Panic panics when err is non-nil.

func Panic2

func Panic2(_ any, err error)

Panic2 _

func RandN

func RandN(n int) int

func RemoveDir

func RemoveDir(fs vfs.FS, dir string)

RemoveDir _

func SyncDir

func SyncDir(fs vfs.FS, dir string) error

SyncDir When you create or delete a file, you have to ensure the directory entry for the file is synced in order to guarantee the file is visible (if the system crashes). (See the man page for fsync, or see https://github.com/coreos/etcd/issues/6368 for an example.)

func VerifyChecksum

func VerifyChecksum(data []byte, expected []byte) error

VerifyChecksum crc32

func VlogFilePath

func VlogFilePath(dirPath string, fid uint32) string

func WarpErr

func WarpErr(format string, err error) error

WarpErr is kept for backward compatibility. Deprecated: use WrapErr.

func Wrap

func Wrap(err error, msg string) error

func WrapErr added in v0.6.1

func WrapErr(format string, err error) error

WrapErr logs and returns err with caller location metadata.

func Wrapf

func Wrapf(err error, format string, args ...any) error

Wrapf is Wrap with extra info.

Types

type ART added in v0.4.2

type ART struct {
	// contains filtered or unexported fields
}

ART implements an adaptive radix tree for memtable indexing. Concurrency model: copy-on-write nodes with CAS installs; reads are lock-free and observe immutable nodes.

func NewART added in v0.4.2

func NewART(arenaSize int64) *ART

NewART creates a new adaptive radix tree with a default arena size.

func (*ART) Add added in v0.4.2

func (a *ART) Add(entry *kv.Entry)

Add inserts or replaces the entry.

func (*ART) DecrRef added in v0.4.2

func (a *ART) DecrRef()

DecrRef decrements the reference counter and releases the tree when it reaches zero. It panics on refcount underflow (decrement past zero) which indicates a bug in the caller's lifetime management.

func (*ART) IncrRef added in v0.4.2

func (a *ART) IncrRef()

IncrRef increments the reference counter.

func (*ART) MemSize added in v0.4.2

func (a *ART) MemSize() int64

MemSize returns an approximate memory footprint.

func (*ART) NewIterator added in v0.4.2

func (a *ART) NewIterator(_ *Options) Iterator

NewIterator returns a tree iterator. Options are ignored for now.

func (*ART) Search added in v0.4.2

func (a *ART) Search(key []byte) kv.ValueStruct

Search returns the value for the earliest key >= target with the same user key.

type AccessPattern added in v0.4.0

type AccessPattern int

AccessPattern hints the OS about expected page access behaviour for a mapped file. It is deliberately minimal to avoid leaking platform-specific flags.

const (
	AccessPatternAuto AccessPattern = iota
	AccessPatternNormal
	AccessPatternSequential
	AccessPatternRandom
	AccessPatternWillNeed
	AccessPatternDontNeed
)

type Arena

type Arena struct {
	// contains filtered or unexported fields
}

Arena should be lock-free.

type Closer

type Closer struct {
	CloseSignal chan struct{}
	// contains filtered or unexported fields
}

Closer coordinates shutdown and resource cleanup.

func NewCloser

func NewCloser() *Closer

NewCloser returns a Closer with an open CloseSignal channel.

func NewCloserInitial

func NewCloserInitial(initial int) *Closer

NewCloserInitial creates a closer with an initial WaitGroup count and a dedicated close signal/context pair for cooperative shutdown.

func (*Closer) Add

func (c *Closer) Add(n int)

Add adjusts the WaitGroup counter.

func (*Closer) Close

func (c *Closer) Close()

Close signals downstream goroutines and waits for them to finish.

func (*Closer) Done

func (c *Closer) Done()

Done marks a goroutine as finished.

func (*Closer) HasBeenClosed

func (c *Closer) HasBeenClosed() <-chan struct{}

func (*Closer) Signal

func (c *Closer) Signal()

func (*Closer) SignalAndWait

func (c *Closer) SignalAndWait()

func (*Closer) Wait

func (c *Closer) Wait()

type DirLock

type DirLock struct {
	// contains filtered or unexported fields
}

DirLock represents an exclusive filesystem lock on a directory.

func AcquireDirLock

func AcquireDirLock(dir string, fs vfs.FS) (*DirLock, error)

AcquireDirLock attempts to obtain an exclusive lock on the provided directory. The lock is implemented using a platform flock on a dedicated LOCK file. The returned DirLock must be released via (*DirLock).Release.

func (*DirLock) Release

func (l *DirLock) Release() error

Release unlocks the directory and removes the lock file.

type Filter

type Filter []byte

Filter is an encoded set of []byte keys.

func NewFilter

func NewFilter(keys []uint32, bitsPerKey int) Filter

NewFilter returns a new Bloom filter that encodes a set of []byte keys with the given number of bits per key, approximately.

A good bitsPerKey value is 10, which yields a filter with ~ 1% false positive rate.

func (Filter) MayContain

func (f Filter) MayContain(h uint32) bool

MayContain returns whether the filter may contain given key. False positives are possible, where it returns true for keys not in the original set.

func (Filter) MayContainKey

func (f Filter) MayContainKey(k []byte) bool

MayContainKey _

type Item

type Item interface {
	Entry() *kv.Entry
}

Item _

type Iterator

type Iterator interface {
	Next()
	Valid() bool
	Rewind()
	Item() Item
	Close() error
	Seek(key []byte)
}

Iterator abstracts key/value iteration.

type Options

type Options struct {
	Prefix []byte
	IsAsc  bool
	// OnlyUseKey instructs iterators to avoid materialising value log entries
	// eagerly. Callers should rely on Item.ValueCopy when value access is
	// required. This keeps the default behaviour (false) for existing users.
	OnlyUseKey bool
	// AccessPattern lets callers hint expected IO behaviour (sequential scans,
	// random point lookups, etc.) so the file layer can tune madvise settings.
	AccessPattern AccessPattern
	// PrefetchBlocks controls how many blocks ahead to prefetch eagerly. Zero
	// disables prefetch.
	PrefetchBlocks int
	// PrefetchWorkers optionally overrides worker count (defaults to
	// min(PrefetchBlocks,4)). Zero uses the default.
	PrefetchWorkers int
	// PrefetchAdaptive enables dynamic tuning of prefetch based on hit/miss stats.
	PrefetchAdaptive bool
	// Metrics tags for observability.
	MetricTag string
}

Options _

type Pool added in v0.4.0

type Pool struct {
	// contains filtered or unexported fields
}

Pool wraps ants.Pool with lightweight metrics.

func NewPool added in v0.4.0

func NewPool(size int, name string) *Pool

NewPool creates a pool with the given size. If size<=0, defaults to 1.

func (*Pool) Release added in v0.4.0

func (pl *Pool) Release()

Release frees resources.

func (*Pool) Size added in v0.4.0

func (pl *Pool) Size() int

Size reports configured worker count.

func (*Pool) Submit added in v0.4.0

func (pl *Pool) Submit(fn func()) error

Submit runs fn in the pool.

type Ring added in v0.4.0

type Ring[T any] struct {
	// contains filtered or unexported fields
}

Ring is a fixed-size MPMC ring buffer with lock-free push/pop. Capacity must be a power of two; constructor will round up.

func NewRing added in v0.4.0

func NewRing[T any](capacity int) *Ring[T]

NewRing creates a ring buffer with at least the given capacity. Capacity is rounded up to the next power of two.

func (*Ring[T]) Cap added in v0.4.0

func (r *Ring[T]) Cap() int

Cap returns buffer capacity.

func (*Ring[T]) Close added in v0.4.0

func (r *Ring[T]) Close()

Close marks the ring as closed; Push will fail and Pop returns empty when drained.

func (*Ring[T]) Closed added in v0.4.0

func (r *Ring[T]) Closed() bool

Closed reports whether the ring has been closed.

func (*Ring[T]) Len added in v0.4.0

func (r *Ring[T]) Len() int

Len returns the current number of elements.

func (*Ring[T]) Pop added in v0.4.0

func (r *Ring[T]) Pop() (val T, ok bool)

Pop removes and returns an element. ok is false if empty or closed.

func (*Ring[T]) Push added in v0.4.0

func (r *Ring[T]) Push(v T) bool

Push inserts v; returns false if the ring is full or closed.

type SkipListIterator

type SkipListIterator struct {
	// contains filtered or unexported fields
}

Iterator is an iterator over skiplist object. For new objects, you just need to initialize Iterator.list.

func (*SkipListIterator) Close

func (s *SkipListIterator) Close() error

Close frees the resources held by the iterator

func (*SkipListIterator) Item

func (s *SkipListIterator) Item() Item

func (*SkipListIterator) Key

func (s *SkipListIterator) Key() []byte

Key returns the key at the current position.

func (*SkipListIterator) Next

func (s *SkipListIterator) Next()

Next advances to the next position.

func (*SkipListIterator) Prev

func (s *SkipListIterator) Prev()

Prev advances to the previous position.

func (*SkipListIterator) Rewind

func (s *SkipListIterator) Rewind()

func (*SkipListIterator) Seek

func (s *SkipListIterator) Seek(target []byte)

Seek advances to the first entry with a key >= target.

func (*SkipListIterator) SeekForPrev

func (s *SkipListIterator) SeekForPrev(target []byte)

SeekForPrev finds an entry with key <= target.

func (*SkipListIterator) SeekToFirst

func (s *SkipListIterator) SeekToFirst()

SeekToFirst seeks position at the first entry in list. Final state of iterator is Valid() iff list is not empty.

func (*SkipListIterator) SeekToLast

func (s *SkipListIterator) SeekToLast()

SeekToLast seeks position at the last entry in list. Final state of iterator is Valid() iff list is not empty.

func (*SkipListIterator) Valid

func (s *SkipListIterator) Valid() bool

Valid returns true iff the iterator is positioned at a valid node.

func (*SkipListIterator) Value

func (s *SkipListIterator) Value() kv.ValueStruct

Value returns value.

func (*SkipListIterator) ValueUint64

func (s *SkipListIterator) ValueUint64() uint64

ValueUint64 returns the uint64 value of the current node.

type Skiplist

type Skiplist struct {
	OnClose func()
	// contains filtered or unexported fields
}

func NewSkiplist

func NewSkiplist(arenaSize int64) *Skiplist

NewSkiplist makes a new empty skiplist, with a given arena size

func (*Skiplist) Add

func (s *Skiplist) Add(e *kv.Entry)

Put inserts the key-value pair.

func (*Skiplist) DecrRef

func (s *Skiplist) DecrRef()

DecrRef decrements the refcount, deallocating the Skiplist when done using it

func (*Skiplist) Draw

func (s *Skiplist) Draw(align bool)

Draw plot Skiplist, align represents align the same node in different level

func (*Skiplist) Empty

func (s *Skiplist) Empty() bool

Empty returns if the Skiplist is empty.

func (*Skiplist) IncrRef

func (s *Skiplist) IncrRef()

IncrRef increases the refcount

func (*Skiplist) MemSize

func (s *Skiplist) MemSize() int64

MemSize returns the size of the Skiplist in terms of how much memory is used within its internal arena.

func (*Skiplist) NewIterator added in v0.4.2

func (s *Skiplist) NewIterator(_ *Options) Iterator

NewIterator returns a skiplist iterator. Options are ignored.

func (*Skiplist) NewSkipListIterator

func (s *Skiplist) NewSkipListIterator() Iterator

NewIterator returns a skiplist iterator. You have to Close() the iterator.

func (*Skiplist) Search

func (s *Skiplist) Search(key []byte) kv.ValueStruct

Get gets the value associated with the key. It returns a valid value if it finds equal or earlier version of the same key.

type Throttle

type Throttle struct {
	// contains filtered or unexported fields
}

Throttle is a small wrapper around ants pool that limits concurrent tasks and collects their errors.

func NewThrottle

func NewThrottle(max int) *Throttle

NewThrottle creates a new throttle with a max number of workers.

func (*Throttle) Do

func (t *Throttle) Do() error

Do/Done remain for compatibility with existing call sites that manage their own goroutines. They simply track waitgroup/err collection without limiting via channel (pool should be used via Go).

func (*Throttle) Done

func (t *Throttle) Done(err error)

func (*Throttle) Finish

func (t *Throttle) Finish() error

Finish waits until all workers have finished working. It returns the first error encountered.

func (*Throttle) Go added in v0.4.0

func (t *Throttle) Go(fn func() error) error

Go submits a task to the underlying goroutine pool.

type WaterMark

type WaterMark struct {
	Name string
	// contains filtered or unexported fields
}

WaterMark is used to keep track of the minimum un-finished index. Typically, an index k becomes finished or "done" according to a WaterMark once Done(k) has been called

  1. as many times as Begin(k) has, AND
  2. a positive number of times.

An index may also become "done" by calling SetDoneUntil at a time such that it is not inter-mingled with Begin/Done calls.

Since doneUntil and lastIndex addresses are passed to sync/atomic packages, we ensure that they are 64-bit aligned by putting them at the beginning of the structure.

func (*WaterMark) Begin

func (w *WaterMark) Begin(index uint64)

Begin sets the last index to the given value.

func (*WaterMark) BeginMany

func (w *WaterMark) BeginMany(indices []uint64)

BeginMany works like Begin but accepts multiple indices.

func (*WaterMark) Done

func (w *WaterMark) Done(index uint64)

Done sets a single index as done.

func (*WaterMark) DoneMany

func (w *WaterMark) DoneMany(indices []uint64)

DoneMany works like Done but accepts multiple indices.

func (*WaterMark) DoneUntil

func (w *WaterMark) DoneUntil() uint64

DoneUntil returns the maximum index that has the property that all indices less than or equal to it are done.

func (*WaterMark) Init

func (w *WaterMark) Init(closer *Closer)

Init initializes a WaterMark struct. MUST be called before using it.

func (*WaterMark) LastIndex

func (w *WaterMark) LastIndex() uint64

LastIndex returns the last index for which Begin has been called.

func (*WaterMark) SetDoneUntil

func (w *WaterMark) SetDoneUntil(val uint64)

SetDoneUntil sets the maximum index that has the property that all indices less than or equal to it are done.

func (*WaterMark) SetLastIndex added in v0.4.2

func (w *WaterMark) SetLastIndex(index uint64)

SetLastIndex advances the last index without changing the watermark counts. It only moves forward to preserve monotonicity.

func (*WaterMark) WaitForMark

func (w *WaterMark) WaitForMark(ctx context.Context, index uint64) error

WaitForMark waits until the given index is marked as done.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL