services

package
v0.0.0-...-6dc5a59 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2026 License: Apache-2.0 Imports: 50 Imported by: 0

Documentation

Overview

Package services provides business logic for the wallet-backend. This file implements ContractMetadataService for fetching SAC/SEP-41 token metadata via RPC.

Account token caching service - manages PostgreSQL storage of account token holdings including both classic Stellar trustlines and Stellar Asset Contract (SAC) balances.

Index

Constants

View Source
const (

	// IngestionModeLive represents continuous ingestion from the latest ledger onwards.
	IngestionModeLive = "live"
	// IngestionModeBackfill represents historical ledger ingestion for a specified range.
	IngestionModeBackfill = "backfill"
)
View Source
const (
	MaxTimeoutInSeconds     = 300
	DefaultTimeoutInSeconds = 30
)
View Source
const (
	// MaximumCreateAccountOperationsPerStellarTx is the max number of sponsored accounts we can create in one
	// transaction due to the signature limit.
	MaximumCreateAccountOperationsPerStellarTx = 19
)

Variables

View Source
var (
	ErrAccountNotFound                     = errors.New("account not found")
	ErrAccountNotEligibleForBeingSponsored = errors.New("account not eligible for being sponsored")
	ErrFeeExceedsMaximumBaseFee            = errors.New("fee exceeds maximum base fee to sponsor")
	ErrNoSignaturesProvided                = errors.New("should have at least one signature")
)
View Source
var (
	ErrInvalidPrivateKeyProvided   = errors.New("invalid private key provided")
	ErrMismatchDistributionAccount = errors.New("mismatch distribution account")
)
View Source
var (
	ErrInvalidTimeout                 = errors.New("invalid timeout: timeout cannot be greater than maximum allowed seconds")
	ErrInvalidOperationChannelAccount = errors.New("invalid operation: operation source account cannot be the channel account")
	ErrInvalidOperationMissingSource  = errors.New("invalid operation: operation source account cannot be empty for non-Soroban operations")
	ErrInvalidSorobanOperationCount   = errors.New("invalid Soroban transaction: must have exactly one operation")
	ErrInvalidSorobanSimulationEmpty  = errors.New("invalid Soroban transaction: simulation response cannot be empty")
	ErrInvalidSorobanSimulationFailed = errors.New("invalid Soroban transaction: simulation failed")
	ErrInvalidSorobanOperationType    = errors.New("invalid Soroban transaction: operation type not supported")
)
View Source
var ErrInvalidAddress = errors.New("invalid address: must be a valid Stellar public key or contract address")
View Source
var PageLimit = 200

Functions

func NewAccountService

func NewAccountService(models *data.Models, metricsService metrics.MetricsService) (*accountService, error)

func NewChannelAccountService

func NewChannelAccountService(_ context.Context, opts ChannelAccountServiceOptions) (*channelAccountService, error)

func NewFeeBumpService

func NewFeeBumpService(opts FeeBumpServiceOptions) (*feeBumpService, error)

func NewIngestService

func NewIngestService(cfg IngestServiceConfig) (*ingestService, error)

func NewKMSImportService

func NewKMSImportService(client kmsiface.KMSAPI, kmsKeyARN string, keypairStore store.KeypairStore, distributionAccountPublicKey string) (*kmsImportService, error)

func NewRPCService

func NewRPCService(rpcURL, networkPassphrase string, httpClient utils.HTTPClient, metricsService metrics.MetricsService) (*rpcService, error)

func NewTransactionService

func NewTransactionService(opts TransactionServiceOptions) (*transactionService, error)

Types

type AccountService

type AccountService interface {
	// RegisterAccount registers an externally created Stellar account to be sponsored, and tracked by ingestion
	RegisterAccount(ctx context.Context, address string) error
	// DeregisterAccount deregisters a Stellar account, no longer sponsoring its transactions, nor tracking it on ingestion
	DeregisterAccount(ctx context.Context, address string) error
}

type BackfillBatch

type BackfillBatch struct {
	StartLedger uint32
	EndLedger   uint32
}

BackfillBatch represents a contiguous range of ledgers to process as a unit.

type BackfillMode

type BackfillMode int

BackfillMode indicates the purpose of backfilling.

const (
	// BackfillModeHistorical fills gaps within already-ingested ledger range.
	BackfillModeHistorical BackfillMode = iota
	// BackfillModeCatchup fills forward gaps to catch up to network tip.
	BackfillModeCatchup
)

type BackfillResult

type BackfillResult struct {
	Batch        BackfillBatch
	LedgersCount int
	Duration     time.Duration
	Error        error
	BatchChanges *BatchChanges // Only populated for catchup mode
}

BackfillResult tracks the outcome of processing a single batch.

type BatchChanges

type BatchChanges struct {
	TrustlineChangesByKey     map[indexer.TrustlineChangeKey]types.TrustlineChange
	ContractChanges           []types.ContractChange
	AccountChangesByAccountID map[string]types.AccountChange
	SACBalanceChangesByKey    map[indexer.SACBalanceChangeKey]types.SACBalanceChange
	UniqueTrustlineAssets     map[uuid.UUID]data.TrustlineAsset
	UniqueContractTokensByID  map[string]types.ContractType
	SACContractsByID          map[string]*data.Contract // SAC contract metadata extracted from instance entries
}

BatchChanges holds data collected from a backfill batch for catchup mode. This data is processed after all parallel batches complete to ensure proper ordering.

type ChannelAccSigner

type ChannelAccSigner func(ctx context.Context, tx *txnbuild.Transaction) (*txnbuild.Transaction, error)

type ChannelAccountService

type ChannelAccountService interface {
	EnsureChannelAccounts(ctx context.Context, number int64) error
}

type ChannelAccountServiceOptions

type ChannelAccountServiceOptions struct {
	DB                                 db.ConnectionPool
	RPCService                         RPCService
	BaseFee                            int64
	DistributionAccountSignatureClient signing.SignatureClient
	ChannelAccountSignatureClient      signing.SignatureClient
	ChannelAccountStore                store.ChannelAccountStore
	PrivateKeyEncrypter                signingutils.PrivateKeyEncrypter
	EncryptionPassphrase               string
}

func (*ChannelAccountServiceOptions) Validate

func (o *ChannelAccountServiceOptions) Validate() error

type ContractMetadata

type ContractMetadata struct {
	ContractID string
	Name       string
	Symbol     string
	Decimals   uint32
}

ContractMetadata holds the metadata for a SEP-41 contract token (name, symbol, decimals).

type ContractMetadataService

type ContractMetadataService interface {
	// FetchSep41Metadata fetches metadata for SEP-41 contracts via RPC without storing.
	// Returns []*data.Contract with ID field pre-computed via DeterministicContractID.
	FetchSep41Metadata(ctx context.Context, contractIDs []string) ([]*data.Contract, error)
	// FetchSACMetadata fetches metadata for SAC contracts by calling name() via RPC.
	// SAC name() returns "code:issuer" format (or "native" for XLM).
	// Returns []*data.Contract with Code, Issuer, Name, Symbol, and Decimals=7.
	FetchSACMetadata(ctx context.Context, contractIDs []string) ([]*data.Contract, error)
	// FetchSingleField fetches a single contract method (name, symbol, decimals, balance, etc...) via RPC simulation.
	// The args parameter allows passing arguments to the contract function (e.g., address for balance(id) function).
	FetchSingleField(ctx context.Context, contractAddress, functionName string, args ...xdr.ScVal) (xdr.ScVal, error)
}

ContractMetadataService handles fetching metadata (name, symbol, decimals) for SEP-41 token contracts via RPC simulation.

func NewContractMetadataService

func NewContractMetadataService(
	rpcService RPCService,
	contractModel data.ContractModelInterface,
	pool pond.Pool,
) (ContractMetadataService, error)

NewContractMetadataService creates a new ContractMetadataService instance.

type ContractMetadataServiceMock

type ContractMetadataServiceMock struct {
	mock.Mock
}

ContractMetadataServiceMock is a mock implementation of ContractMetadataService

func NewContractMetadataServiceMock

func NewContractMetadataServiceMock(t interface {
	mock.TestingT
	Cleanup(func())
},
) *ContractMetadataServiceMock

NewContractMetadataServiceMock creates a new instance of ContractMetadataServiceMock.

func (*ContractMetadataServiceMock) FetchMetadata

func (c *ContractMetadataServiceMock) FetchMetadata(ctx context.Context, contractTypesByID map[string]types.ContractType) ([]*data.Contract, error)

func (*ContractMetadataServiceMock) FetchSACMetadata

func (c *ContractMetadataServiceMock) FetchSACMetadata(ctx context.Context, contractIDs []string) ([]*data.Contract, error)

func (*ContractMetadataServiceMock) FetchSep41Metadata

func (c *ContractMetadataServiceMock) FetchSep41Metadata(ctx context.Context, contractIDs []string) ([]*data.Contract, error)

func (*ContractMetadataServiceMock) FetchSingleField

func (c *ContractMetadataServiceMock) FetchSingleField(ctx context.Context, contractAddress, functionName string, funcArgs ...xdr.ScVal) (xdr.ScVal, error)

type ContractValidator

type ContractValidator interface {
	ValidateFromContractCode(_ context.Context, _ []byte) (types.ContractType, error)
	Close(_ context.Context) error
}

func NewContractValidator

func NewContractValidator() ContractValidator

NewContractValidator creates a new ContractValidator with a configured wazero runtime. The runtime is initialized with custom sections enabled to extract contract specifications from WASM bytecode.

type FeeBumpService

type FeeBumpService interface {
	WrapTransaction(ctx context.Context, tx *txnbuild.Transaction) (string, string, error)
	GetMaximumBaseFee() int64
}

type FeeBumpServiceOptions

type FeeBumpServiceOptions struct {
	DistributionAccountSignatureClient signing.SignatureClient
	BaseFee                            int64
	Models                             *data.Models
}

func (*FeeBumpServiceOptions) Validate

func (o *FeeBumpServiceOptions) Validate() error

type GetLedgersResponse

type GetLedgersResponse protocol.GetLedgersResponse

type HistoryArchiveMock

type HistoryArchiveMock struct {
	mock.Mock
}

HistoryArchiveMock is a mock implementation of the historyarchive.ArchiveInterface

func (*HistoryArchiveMock) BucketExists

func (m *HistoryArchiveMock) BucketExists(bucket historyarchive.Hash) (bool, error)

func (*HistoryArchiveMock) BucketSize

func (m *HistoryArchiveMock) BucketSize(bucket historyarchive.Hash) (int64, error)

func (*HistoryArchiveMock) CategoryCheckpointExists

func (m *HistoryArchiveMock) CategoryCheckpointExists(cat string, chk uint32) (bool, error)

func (*HistoryArchiveMock) GetCheckpointHAS

func (*HistoryArchiveMock) GetCheckpointManager

func (m *HistoryArchiveMock) GetCheckpointManager() historyarchive.CheckpointManager

func (*HistoryArchiveMock) GetLatestLedgerSequence

func (m *HistoryArchiveMock) GetLatestLedgerSequence() (uint32, error)

func (*HistoryArchiveMock) GetLedgerHeader

func (m *HistoryArchiveMock) GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error)

func (*HistoryArchiveMock) GetLedgers

func (m *HistoryArchiveMock) GetLedgers(start, end uint32) (map[uint32]*historyarchive.Ledger, error)

func (*HistoryArchiveMock) GetPathHAS

func (*HistoryArchiveMock) GetRootHAS

func (*HistoryArchiveMock) GetStats

func (*HistoryArchiveMock) GetXdrStream

func (m *HistoryArchiveMock) GetXdrStream(pth string) (*xdr.Stream, error)

func (*HistoryArchiveMock) GetXdrStreamForHash

func (m *HistoryArchiveMock) GetXdrStreamForHash(hash historyarchive.Hash) (*xdr.Stream, error)

func (*HistoryArchiveMock) ListAllBucketHashes

func (m *HistoryArchiveMock) ListAllBucketHashes() (chan historyarchive.Hash, chan error)

func (*HistoryArchiveMock) ListAllBuckets

func (m *HistoryArchiveMock) ListAllBuckets() (chan string, chan error)

func (*HistoryArchiveMock) ListBucket

func (m *HistoryArchiveMock) ListBucket(dp historyarchive.DirPrefix) (chan string, chan error)

func (*HistoryArchiveMock) ListCategoryCheckpoints

func (m *HistoryArchiveMock) ListCategoryCheckpoints(cat string, pth string) (chan uint32, chan error)

func (*HistoryArchiveMock) PutCheckpointHAS

func (*HistoryArchiveMock) PutPathHAS

func (*HistoryArchiveMock) PutRootHAS

type IngestService

type IngestService interface {
	Run(ctx context.Context, startLedger uint32, endLedger uint32) error
	// PersistLedgerData persists processed ledger data to the database in a single atomic transaction.
	// This is the shared core used by both live ingestion and loadtest.
	// Returns the number of transactions and operations persisted.
	PersistLedgerData(ctx context.Context, ledgerSeq uint32, buffer *indexer.IndexerBuffer, cursorName string) (int, int, error)
}

type IngestServiceConfig

type IngestServiceConfig struct {
	// === Core ===
	IngestionMode  string
	Models         *data.Models
	AppTracker     apptracker.AppTracker
	MetricsService metrics.MetricsService

	// === Stellar Network ===
	Network           string
	NetworkPassphrase string
	Archive           historyarchive.ArchiveInterface
	RPCService        RPCService

	// === Ledger Backend ===
	LedgerBackend        ledgerbackend.LedgerBackend
	LedgerBackendFactory LedgerBackendFactory

	// === Cursors ===
	LatestLedgerCursorName string
	OldestLedgerCursorName string

	// === Live Mode Dependencies ===
	ChannelAccountStore     store.ChannelAccountStore
	TokenIngestionService   TokenIngestionService
	ContractMetadataService ContractMetadataService

	// === Processing Options ===
	GetLedgersLimit            int
	SkipTxMeta                 bool
	SkipTxEnvelope             bool
	EnableParticipantFiltering bool

	// === Backfill Tuning ===
	BackfillWorkers           int
	BackfillBatchSize         int
	BackfillDBInsertBatchSize int
	CatchupThreshold          int
}

IngestServiceConfig holds the configuration for creating an IngestService.

type KMSImportService

type KMSImportService interface {
	ImportDistributionAccountKey(ctx context.Context, distributionAccountSeed string) error
}

type LedgerBackendFactory

type LedgerBackendFactory func(ctx context.Context) (ledgerbackend.LedgerBackend, error)

LedgerBackendFactory creates new LedgerBackend instances for parallel batch processing. Each batch needs its own backend because LedgerBackend is not thread-safe.

type LedgerBackendMock

type LedgerBackendMock struct {
	mock.Mock
}

LedgerBackendMock is a mock implementation of the ledgerbackend.LedgerBackend interface

func (*LedgerBackendMock) Close

func (l *LedgerBackendMock) Close() error

func (*LedgerBackendMock) GetLatestLedgerSequence

func (l *LedgerBackendMock) GetLatestLedgerSequence(ctx context.Context) (uint32, error)

func (*LedgerBackendMock) GetLedger

func (l *LedgerBackendMock) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error)

func (*LedgerBackendMock) IsPrepared

func (l *LedgerBackendMock) IsPrepared(ctx context.Context, ledgerRange ledgerbackend.Range) (bool, error)

func (*LedgerBackendMock) PrepareRange

func (l *LedgerBackendMock) PrepareRange(ctx context.Context, ledgerRange ledgerbackend.Range) error

type RPCService

type RPCService interface {
	GetTransaction(transactionHash string) (entities.RPCGetTransactionResult, error)
	GetTransactions(startLedger int64, startCursor string, limit int) (entities.RPCGetTransactionsResult, error)
	SendTransaction(transactionXDR string) (entities.RPCSendTransactionResult, error)
	GetHealth() (entities.RPCGetHealthResult, error)
	GetLedgers(startLedger uint32, limit uint32) (GetLedgersResponse, error)
	GetLedgerEntries(keys []string) (entities.RPCGetLedgerEntriesResult, error)
	GetAccountLedgerSequence(address string) (int64, error)
	GetHeartbeatChannel() chan entities.RPCGetHealthResult
	// TrackRPCServiceHealth continuously monitors the health of the RPC service and updates metrics.
	// It runs health checks at regular intervals and can be triggered on-demand via immediateHealthCheckTrigger.
	//
	// The immediateHealthCheckTrigger channel allows external components to request an immediate health check,
	// which is particularly useful when the ingestor needs to catch up with the RPC service.
	//
	// Returns an error if the context is cancelled. The caller is responsible for handling shutdown signals.
	TrackRPCServiceHealth(ctx context.Context, immediateHealthCheckTrigger <-chan any) error
	SimulateTransaction(transactionXDR string, resourceConfig entities.RPCResourceConfig) (entities.RPCSimulateTransactionResult, error)
	NetworkPassphrase() string
}

type RPCServiceMock

type RPCServiceMock struct {
	mock.Mock
}

func NewRPCServiceMock

func NewRPCServiceMock(t interface {
	mock.TestingT
	Cleanup(func())
},
) *RPCServiceMock

NewRPCServiceMock creates a new instance of RPCServiceMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*RPCServiceMock) GetAccountLedgerSequence

func (r *RPCServiceMock) GetAccountLedgerSequence(address string) (int64, error)

func (*RPCServiceMock) GetHealth

func (r *RPCServiceMock) GetHealth() (entities.RPCGetHealthResult, error)

func (*RPCServiceMock) GetHeartbeatChannel

func (r *RPCServiceMock) GetHeartbeatChannel() chan entities.RPCGetHealthResult

func (*RPCServiceMock) GetLedgerEntries

func (r *RPCServiceMock) GetLedgerEntries(keys []string) (entities.RPCGetLedgerEntriesResult, error)

func (*RPCServiceMock) GetLedgers

func (r *RPCServiceMock) GetLedgers(startLedger uint32, limit uint32) (GetLedgersResponse, error)

func (*RPCServiceMock) GetTransaction

func (r *RPCServiceMock) GetTransaction(transactionHash string) (entities.RPCGetTransactionResult, error)

func (*RPCServiceMock) GetTransactions

func (r *RPCServiceMock) GetTransactions(startLedger int64, startCursor string, limit int) (entities.RPCGetTransactionsResult, error)

func (*RPCServiceMock) NetworkPassphrase

func (r *RPCServiceMock) NetworkPassphrase() string

func (*RPCServiceMock) SendTransaction

func (r *RPCServiceMock) SendTransaction(transactionXdr string) (entities.RPCSendTransactionResult, error)

func (*RPCServiceMock) SimulateTransaction

func (r *RPCServiceMock) SimulateTransaction(transactionXDR string, resourceConfig entities.RPCResourceConfig) (entities.RPCSimulateTransactionResult, error)

func (*RPCServiceMock) TrackRPCServiceHealth

func (r *RPCServiceMock) TrackRPCServiceHealth(ctx context.Context, triggerHeartbeat <-chan any) error

type TokenIngestionService

type TokenIngestionService interface {
	// PopulateAccountTokens performs initial PostgreSQL cache population from Stellar
	// history archive for a specific checkpoint. It extracts all trustlines and contract
	// tokens from checkpoint ledger entries and stores them in PostgreSQL.
	// The checkpointLedger parameter specifies which checkpoint to use for population.
	// The initializeCursors callback is invoked within the same DB transaction as
	// the metadata storage to ensure atomic initialization.
	PopulateAccountTokens(ctx context.Context, checkpointLedger uint32, initializeCursors func(pgx.Tx) error) error

	// ProcessTokenChanges applies trustline, contract, and SAC balance changes to PostgreSQL.
	// This is called by the indexer for each ledger's state changes during live ingestion.
	//
	// Storage semantics differ between token types:
	// - Trustlines: Can be added or removed. When all trustlines for an account are removed,
	//   the account's entry is deleted from PostgreSQL.
	// - Contracts: Only SEP-41 contracts are tracked (contracts accumulate). Unknown contracts
	//   are skipped. Contract balance entries persist in the ledger even when balance is zero,
	//   so we track all contracts an account has ever held a balance in.
	// - SAC Balances: For contract addresses (C...) only. Stores absolute balance values with
	//   authorized and clawback flags. G-addresses use trustlines for SAC balances.
	//
	// Both trustline and contract IDs are computed using deterministic hash functions (DeterministicAssetID, DeterministicContractID).
	ProcessTokenChanges(ctx context.Context, dbTx pgx.Tx, trustlineChangesByTrustlineKey map[indexer.TrustlineChangeKey]types.TrustlineChange, contractChanges []types.ContractChange, accountChangesByAccountID map[string]types.AccountChange, sacBalanceChangesByKey map[indexer.SACBalanceChangeKey]types.SACBalanceChange) error
}

TokenIngestionService provides write access to account token storage during ingestion.

func NewTokenIngestionService

func NewTokenIngestionService(
	dbPool db.ConnectionPool,
	networkPassphrase string,
	archive historyarchive.ArchiveInterface,
	contractValidator ContractValidator,
	contractMetadataService ContractMetadataService,
	trustlineAssetModel wbdata.TrustlineAssetModelInterface,
	trustlineBalanceModel wbdata.TrustlineBalanceModelInterface,
	nativeBalanceModel wbdata.NativeBalanceModelInterface,
	sacBalanceModel wbdata.SACBalanceModelInterface,
	accountContractTokensModel wbdata.AccountContractTokensModelInterface,
	contractModel wbdata.ContractModelInterface,
) TokenIngestionService

NewTokenIngestionService creates a TokenIngestionService for ingestion.

func NewTokenIngestionServiceForLoadtest

func NewTokenIngestionServiceForLoadtest(
	dbPool db.ConnectionPool,
	networkPassphrase string,
	trustlineBalanceModel wbdata.TrustlineBalanceModelInterface,
	nativeBalanceModel wbdata.NativeBalanceModelInterface,
	sacBalanceModel wbdata.SACBalanceModelInterface,
	accountContractTokensModel wbdata.AccountContractTokensModelInterface,
) TokenIngestionService

NewTokenIngestionServiceForLoadtest creates a minimal TokenIngestionService that only supports ProcessTokenChanges (not PopulateAccountTokens). This is used by the loadtest runner which doesn't need archive/validator/metadata services.

type TokenIngestionServiceMock

type TokenIngestionServiceMock struct {
	mock.Mock
}

TokenIngestionServiceMock is a mock implementation of the TokenIngestionService interface

func NewTokenIngestionServiceMock

func NewTokenIngestionServiceMock(t interface {
	mock.TestingT
	Cleanup(func())
},
) *TokenIngestionServiceMock

NewTokenIngestionServiceMock creates a new instance of TokenIngestionServiceMock.

func (*TokenIngestionServiceMock) PopulateAccountTokens

func (m *TokenIngestionServiceMock) PopulateAccountTokens(ctx context.Context, checkpointLedger uint32, initializeCursors func(pgx.Tx) error) error

func (*TokenIngestionServiceMock) ProcessTokenChanges

func (m *TokenIngestionServiceMock) ProcessTokenChanges(ctx context.Context, dbTx pgx.Tx, trustlineChangesByTrustlineKey map[indexer.TrustlineChangeKey]types.TrustlineChange, contractChanges []types.ContractChange, accountChangesByAccountID map[string]types.AccountChange, sacBalanceChangesByKey map[indexer.SACBalanceChangeKey]types.SACBalanceChange) error

type TransactionService

type TransactionService interface {
	NetworkPassphrase() string
	BuildAndSignTransactionWithChannelAccount(ctx context.Context, transaction *txnbuild.GenericTransaction, simulationResult *entities.RPCSimulateTransactionResult) (*txnbuild.Transaction, error)
}

type TransactionServiceOptions

type TransactionServiceOptions struct {
	DB                                 db.ConnectionPool
	DistributionAccountSignatureClient signing.SignatureClient
	ChannelAccountSignatureClient      signing.SignatureClient
	ChannelAccountStore                store.ChannelAccountStore
	RPCService                         RPCService
	BaseFee                            int64
}

func (*TransactionServiceOptions) ValidateOptions

func (o *TransactionServiceOptions) ValidateOptions() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL