src

package
v1.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 18, 2025 License: Apache-2.0 Imports: 49 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ColumnSeparator            = '☆' // make me happy
	DefaultGenRowCount         = 1000
	GenDataFileFirstLinePrefix = "columns:" // optional first line prefix if stream load needs 'columns: xxx' header
)
View Source
const (
	ReplaySqlPrefix = `/*dodo{`
	ReplaySqlSuffix = `*/`

	ReplayResultFileExt      = ".result"
	ReplayCustomClientPrefix = "client"
)
View Source
const (
	AnonymizeHashBytes = 8
)
View Source
const (
	LLMOutputPrefix = "```yaml\n"
)
View Source
const (
	StreamLoadMaxRetries = 3
)

Variables

View Source
var (
	InternalSqlComment = "/*dodo*/"
)
View Source
var (
	NumberRe = regexp.MustCompile(`\d+`)
)

Functions

func Anonymize

func Anonymize(method string, s string) string

func AnonymizeSql

func AnonymizeSql(method string, sqlId, sql string) string

func Choose

func Choose(msg string, items []string) (string, error)

func Confirm

func Confirm(msg string) bool

func CountAuditlogs

func CountAuditlogs(
	ctx context.Context,
	db *sqlx.DB,
	dbname, table string,
	opts AuditLogScanOpts,
) (int, error)

func DecodeReplaySqls

func DecodeReplaySqls(
	s *bufio.Scanner,
	dbs, users map[string]struct{},
	from, to int64,
	clientCount int,
) (map[string][]*ReplaySql, int64, int, error)

func DetectCharset

func DetectCharset(r *bufio.Reader) (string, error)

func EncodeReplaySql

func EncodeReplaySql(ts, client, user, db, queryId, stmt string, durationMs int64) string

func ExpandHome

func ExpandHome(path string) string

func ExtractQueriesFromAuditLogs

func ExtractQueriesFromAuditLogs(
	writers []SqlWriter,
	auditlogPaths []string,
	encoding string,
	opts AuditLogScanOpts,
	parallel int,
) (int, error)

ExtractQueriesFromAuditLog extracts the query from an audit log.

func FileGlob

func FileGlob(paths []string) ([]string, error)

func GetDBAuditLogs

func GetDBAuditLogs(
	ctx context.Context,
	w SqlWriter,
	db *sqlx.DB,
	dbname, table string,
	opts AuditLogScanOpts,
	parallel int,
) (int, error)

func GetEncoding

func GetEncoding(name string) (encoding.Encoding, error)

func IsStringType

func IsStringType(colType string) bool

func LLMGendataConfig

func LLMGendataConfig(
	ctx context.Context,
	apiKey, baseURL, model, prompt_ string,
	tables, columnStats, sqls []string,
) (string, error)

Use Deepseek by default, but you can use OpenAI by setting the apiKey and baseURL

func MustJsonMarshal

func MustJsonMarshal(v any) []byte

func MustYamlMarshal

func MustYamlMarshal(v any) []byte

func NewDB

func NewDB(host string, port uint16, user, password, catalog, db string) (*sqlx.DB, error)

func ParallelGroup

func ParallelGroup(parallel int) *errgroup.Group

func ReadFileOrStdin

func ReadFileOrStdin(path string) (string, error)

func ReplaySqls

func ReplaySqls(
	ctx context.Context,
	host string, port uint16, user, password, catalog, cluster string,
	resultDir string, clientSqls []ClientSqls, speed float32, maxHashRows int, maxConnIdleTime time.Duration,
	minTs int64, parallel int,
) error

func RunCreateSQL

func RunCreateSQL(ctx context.Context, conn *sqlx.DB, db string, sqlFile string, beCount int, dryrun bool) (needDependence string, err error)

func SanitizeLike

func SanitizeLike(s string) string

func ScpFromRemote

func ScpFromRemote(ctx context.Context, privKey, remoteUrl, localPath string) error

ScpFromRemote copies a file from a remote server to the local machine using scp.

privKey is the path to the private key to use for authentication.
remoteUrl is the address of file on the remote server, format user@host:port/path.
localPath is the path of the local file to copy to.

func SetupAnonymizer

func SetupAnonymizer(method, hashdictPath string, idMinLength int, reserveIds ...string)

func ShowBackendCount

func ShowBackendCount(ctx context.Context, conn *sqlx.DB) (count int, err error)

func ShowDatabases

func ShowDatabases(ctx context.Context, conn *sqlx.DB, dbnamePrefix string) ([]string, error)

func ShowFronendsDisksDir

func ShowFronendsDisksDir(ctx context.Context, conn *sqlx.DB, diskType string) (dir string, err error)

func SshLs

func SshLs(_ context.Context, privKey, remoteUrl string) ([]string, error)

func StoreMiniHashDict

func StoreMiniHashDict(method, hashdictPath string)

func StreamLoad

func StreamLoad(ctx context.Context, host, httpPort, user, password, db, table, file, fileProgress string, dryrun bool) error

func WriteFile

func WriteFile(path string, content string) error

Types

type AuditLogScanOpts

type AuditLogScanOpts struct {
	// filter
	DBs                []string
	QueryMinDurationMs int64
	QueryStates        []string
	OnlySelect         bool
	From, To           string

	Strict bool
}

type AuditLogScanner

type AuditLogScanner interface {
	Init()
	ScanOne(oneLine []byte) error
	Consume(w SqlWriter) (int, error)
	Close()
}

Not thread safe.

func NewAuditLogScanner

func NewAuditLogScanner(opts AuditLogScanOpts) AuditLogScanner

type BytesEncoder

type BytesEncoder interface {
	Encode(b []byte) ([]byte, error)
}

func NewBytesEncoder

func NewBytesEncoder(srcEncoding encoding.Encoding) BytesEncoder

type ClientSqls

type ClientSqls struct {
	Client string
	Sqls   []*ReplaySql
}

type ColumnStats

type ColumnStats struct {
	Name        string `yaml:"name"`
	Count       int64  `yaml:"-"`
	Ndv         int64  `yaml:"ndv"`
	NullCount   int64  `yaml:"null_count"`
	DataSize    int64  `yaml:"data_size"`
	AvgSizeByte int64  `yaml:"avg_size_byte"`
	Min         string `yaml:"min"`
	Max         string `yaml:"max"`
	Method      string `yaml:"method"`
}

type CreateParserListener

type CreateParserListener struct {
	*parser.BaseDorisParserListener
	// contains filtered or unexported fields
}

func (*CreateParserListener) ExitPropertyItem

func (l *CreateParserListener) ExitPropertyItem(ctx *parser.PropertyItemContext)

Modify property value

type DBSchema

type DBSchema struct {
	Name    string        `yaml:"db"`
	Schemas []*Schema     `yaml:"-"`
	Stats   []*TableStats `yaml:"tables,omitempty"`
}

type DummyEncoder

type DummyEncoder struct {
}

func (*DummyEncoder) Encode

func (*DummyEncoder) Encode(b []byte) ([]byte, error)

type GenRule

type GenRule = gen.GenRule

type ReplayClient

type ReplayClient struct {
	// contains filtered or unexported fields
}

func (*ReplayClient) Close

func (c *ReplayClient) Close(closefile bool)

type ReplayResult

type ReplayResult struct {
	Ts      string `json:"ts,omitempty"`
	QueryId string `json:"queryId"`

	ReturnRows     int    `json:"returnRows"`
	ReturnRowsHash string `json:"returnRowsHash,omitempty"`
	DurationMs     int64  `json:"durationMs"`
	Err            string `json:"err,omitempty"`
	Stmt           string `json:"stmt,omitempty"`
}

func (*ReplayResult) String

func (re *ReplayResult) String() string

type ReplaySql

type ReplaySql struct {
	ReplaySqlMeta

	Stmt string
}

func (*ReplaySql) ToReplayResult

func (s *ReplaySql) ToReplayResult() *ReplayResult

type ReplaySqlMeta

type ReplaySqlMeta struct {
	Ts_        string `json:"ts"`
	Ts         int64  `json:"-"`
	Client     string `json:"client"`
	User       string `json:"user"`
	Db         string `json:"db"`
	QueryId    string `json:"queryId"`
	DurationMs int64  `json:"durationMs,omitempty"`
}

ReplaySqlMeta will be prepend to every sql as a comment.

e.g. "/*dodo{"ts": "2024-09-20 00:00:00", "client": "127.0.0.1:32345", "user": "root", "db": "test", "queryId": "1"}*/ <the sql>"

func (*ReplaySqlMeta) Timestamp

func (m *ReplaySqlMeta) Timestamp() (ms int64, err error)

type Schema

type Schema struct {
	Name       string     `db:"TABLE_NAME"`
	Type       SchemaType `db:"TABLE_TYPE"`
	DB         string     `db:"TABLE_SCHEMA"`
	CreateStmt string     `db:"-"`
}

func ShowCreateTables

func ShowCreateTables(ctx context.Context, conn *sqlx.DB, db string, dbTables ...string) (schemas []*Schema, err error)

func ShowTables

func ShowTables(ctx context.Context, conn *sqlx.DB, dbname string, tablenamePrefix ...string) (tables []*Schema, err error)

func (*Schema) String

func (s *Schema) String() string

type SchemaType

type SchemaType string
var (
	SchemaTypeTable            SchemaType = "TABLE"
	SchemaTypeView             SchemaType = "VIEW"
	SchemaTypeMaterializedView SchemaType = "MATERIALIZED_VIEW"
)

func (SchemaType) Lower

func (s SchemaType) Lower() string

type SimpleAuditLogScanner

type SimpleAuditLogScanner struct {
	AuditLogScanOpts
	// contains filtered or unexported fields
}

func NewSimpleAuditLogScanner

func NewSimpleAuditLogScanner(opts AuditLogScanOpts) *SimpleAuditLogScanner

func (*SimpleAuditLogScanner) Close

func (*SimpleAuditLogScanner) Close()

func (*SimpleAuditLogScanner) Consume

func (s *SimpleAuditLogScanner) Consume(w SqlWriter) (int, error)

func (*SimpleAuditLogScanner) Init

func (s *SimpleAuditLogScanner) Init()

func (*SimpleAuditLogScanner) ScanOne

func (s *SimpleAuditLogScanner) ScanOne(oneLog []byte) error

type SqlWriter

type SqlWriter interface {
	io.Closer
	WriteSql(s string) error
}

type TableGen

type TableGen struct {
	Name       string
	Columns    []string
	DDLFile    string
	Rows       int
	RefToTable map[string]struct{} // ref generator to other tables

	StreamloadColMapping string
	// contains filtered or unexported fields
}

func NewTableGen

func NewTableGen(ddlfile, createTableStmt string, stats *TableStats, rows int, streamloadColNames []string) (*TableGen, error)

func (*TableGen) GenCSV

func (tg *TableGen) GenCSV(w *bufio.Writer, rows int) error

Gen generates multiple CSV line into writer.

func (*TableGen) RecordRefTables

func (tg *TableGen) RecordRefTables(ts ...string)

func (*TableGen) RemoveRefTable

func (tg *TableGen) RemoveRefTable(t string)

type TableStats

type TableStats struct {
	Name     string         `yaml:"name"`
	RowCount int64          `yaml:"row_count"`
	Columns  []*ColumnStats `yaml:"columns,omitempty"`
}

func GetTablesStats

func GetTablesStats(ctx context.Context, conn *sqlx.DB, analyze bool, dbname string, tables ...string) ([]*TableStats, error)

type Utf8Encoder

type Utf8Encoder struct {
	// contains filtered or unexported fields
}

func (*Utf8Encoder) Encode

func (e *Utf8Encoder) Encode(b []byte) ([]byte, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL