Documentation
¶
Overview ¶
Package services provides business logic for the wallet-backend. This file implements ContractMetadataService for fetching SAC/SEP-41 token metadata via RPC.
Account token caching service - manages PostgreSQL storage of account token holdings including both classic Stellar trustlines and Stellar Asset Contract (SAC) balances.
Index ¶
- Constants
- Variables
- func NewAccountService(models *data.Models, metricsService metrics.MetricsService) (*accountService, error)
- func NewChannelAccountService(_ context.Context, opts ChannelAccountServiceOptions) (*channelAccountService, error)
- func NewFeeBumpService(opts FeeBumpServiceOptions) (*feeBumpService, error)
- func NewIngestService(cfg IngestServiceConfig) (*ingestService, error)
- func NewKMSImportService(client kmsiface.KMSAPI, kmsKeyARN string, keypairStore store.KeypairStore, ...) (*kmsImportService, error)
- func NewRPCService(rpcURL, networkPassphrase string, httpClient utils.HTTPClient, ...) (*rpcService, error)
- func NewTransactionService(opts TransactionServiceOptions) (*transactionService, error)
- type AccountService
- type BackfillBatch
- type BackfillMode
- type BackfillResult
- type BatchChanges
- type ChannelAccSigner
- type ChannelAccountService
- type ChannelAccountServiceOptions
- type ContractMetadata
- type ContractMetadataService
- type ContractMetadataServiceMock
- func (c *ContractMetadataServiceMock) FetchMetadata(ctx context.Context, contractTypesByID map[string]types.ContractType) ([]*data.Contract, error)
- func (c *ContractMetadataServiceMock) FetchSACMetadata(ctx context.Context, contractIDs []string) ([]*data.Contract, error)
- func (c *ContractMetadataServiceMock) FetchSep41Metadata(ctx context.Context, contractIDs []string) ([]*data.Contract, error)
- func (c *ContractMetadataServiceMock) FetchSingleField(ctx context.Context, contractAddress, functionName string, ...) (xdr.ScVal, error)
- type ContractValidator
- type FeeBumpService
- type FeeBumpServiceOptions
- type GetLedgersResponse
- type HistoryArchiveMock
- func (m *HistoryArchiveMock) BucketExists(bucket historyarchive.Hash) (bool, error)
- func (m *HistoryArchiveMock) BucketSize(bucket historyarchive.Hash) (int64, error)
- func (m *HistoryArchiveMock) CategoryCheckpointExists(cat string, chk uint32) (bool, error)
- func (m *HistoryArchiveMock) GetCheckpointHAS(chk uint32) (historyarchive.HistoryArchiveState, error)
- func (m *HistoryArchiveMock) GetCheckpointManager() historyarchive.CheckpointManager
- func (m *HistoryArchiveMock) GetLatestLedgerSequence() (uint32, error)
- func (m *HistoryArchiveMock) GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error)
- func (m *HistoryArchiveMock) GetLedgers(start, end uint32) (map[uint32]*historyarchive.Ledger, error)
- func (m *HistoryArchiveMock) GetPathHAS(path string) (historyarchive.HistoryArchiveState, error)
- func (m *HistoryArchiveMock) GetRootHAS() (historyarchive.HistoryArchiveState, error)
- func (m *HistoryArchiveMock) GetStats() []historyarchive.ArchiveStats
- func (m *HistoryArchiveMock) GetXdrStream(pth string) (*xdr.Stream, error)
- func (m *HistoryArchiveMock) GetXdrStreamForHash(hash historyarchive.Hash) (*xdr.Stream, error)
- func (m *HistoryArchiveMock) ListAllBucketHashes() (chan historyarchive.Hash, chan error)
- func (m *HistoryArchiveMock) ListAllBuckets() (chan string, chan error)
- func (m *HistoryArchiveMock) ListBucket(dp historyarchive.DirPrefix) (chan string, chan error)
- func (m *HistoryArchiveMock) ListCategoryCheckpoints(cat string, pth string) (chan uint32, chan error)
- func (m *HistoryArchiveMock) PutCheckpointHAS(chk uint32, has historyarchive.HistoryArchiveState, ...) error
- func (m *HistoryArchiveMock) PutPathHAS(path string, has historyarchive.HistoryArchiveState, ...) error
- func (m *HistoryArchiveMock) PutRootHAS(has historyarchive.HistoryArchiveState, opts *historyarchive.CommandOptions) error
- type IngestService
- type IngestServiceConfig
- type KMSImportService
- type LedgerBackendFactory
- type LedgerBackendMock
- func (l *LedgerBackendMock) Close() error
- func (l *LedgerBackendMock) GetLatestLedgerSequence(ctx context.Context) (uint32, error)
- func (l *LedgerBackendMock) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error)
- func (l *LedgerBackendMock) IsPrepared(ctx context.Context, ledgerRange ledgerbackend.Range) (bool, error)
- func (l *LedgerBackendMock) PrepareRange(ctx context.Context, ledgerRange ledgerbackend.Range) error
- type RPCService
- type RPCServiceMock
- func (r *RPCServiceMock) GetAccountLedgerSequence(address string) (int64, error)
- func (r *RPCServiceMock) GetHealth() (entities.RPCGetHealthResult, error)
- func (r *RPCServiceMock) GetHeartbeatChannel() chan entities.RPCGetHealthResult
- func (r *RPCServiceMock) GetLedgerEntries(keys []string) (entities.RPCGetLedgerEntriesResult, error)
- func (r *RPCServiceMock) GetLedgers(startLedger uint32, limit uint32) (GetLedgersResponse, error)
- func (r *RPCServiceMock) GetTransaction(transactionHash string) (entities.RPCGetTransactionResult, error)
- func (r *RPCServiceMock) GetTransactions(startLedger int64, startCursor string, limit int) (entities.RPCGetTransactionsResult, error)
- func (r *RPCServiceMock) NetworkPassphrase() string
- func (r *RPCServiceMock) SendTransaction(transactionXdr string) (entities.RPCSendTransactionResult, error)
- func (r *RPCServiceMock) SimulateTransaction(transactionXDR string, resourceConfig entities.RPCResourceConfig) (entities.RPCSimulateTransactionResult, error)
- func (r *RPCServiceMock) TrackRPCServiceHealth(ctx context.Context, triggerHeartbeat <-chan any) error
- type TokenIngestionService
- type TokenIngestionServiceMock
- type TransactionService
- type TransactionServiceOptions
Constants ¶
const ( // IngestionModeLive represents continuous ingestion from the latest ledger onwards. IngestionModeLive = "live" // IngestionModeBackfill represents historical ledger ingestion for a specified range. IngestionModeBackfill = "backfill" )
const ( MaxTimeoutInSeconds = 300 DefaultTimeoutInSeconds = 30 )
const ( // MaximumCreateAccountOperationsPerStellarTx is the max number of sponsored accounts we can create in one // transaction due to the signature limit. MaximumCreateAccountOperationsPerStellarTx = 19 )
Variables ¶
var ( ErrAccountNotFound = errors.New("account not found") ErrAccountNotEligibleForBeingSponsored = errors.New("account not eligible for being sponsored") ErrFeeExceedsMaximumBaseFee = errors.New("fee exceeds maximum base fee to sponsor") ErrNoSignaturesProvided = errors.New("should have at least one signature") )
var ( ErrInvalidPrivateKeyProvided = errors.New("invalid private key provided") ErrMismatchDistributionAccount = errors.New("mismatch distribution account") )
var ( ErrInvalidTimeout = errors.New("invalid timeout: timeout cannot be greater than maximum allowed seconds") ErrInvalidOperationChannelAccount = errors.New("invalid operation: operation source account cannot be the channel account") ErrInvalidOperationMissingSource = errors.New("invalid operation: operation source account cannot be empty for non-Soroban operations") ErrInvalidSorobanOperationCount = errors.New("invalid Soroban transaction: must have exactly one operation") ErrInvalidSorobanSimulationEmpty = errors.New("invalid Soroban transaction: simulation response cannot be empty") ErrInvalidSorobanSimulationFailed = errors.New("invalid Soroban transaction: simulation failed") ErrInvalidSorobanOperationType = errors.New("invalid Soroban transaction: operation type not supported") )
var ErrInvalidAddress = errors.New("invalid address: must be a valid Stellar public key or contract address")
var PageLimit = 200
Functions ¶
func NewAccountService ¶
func NewAccountService(models *data.Models, metricsService metrics.MetricsService) (*accountService, error)
func NewChannelAccountService ¶
func NewChannelAccountService(_ context.Context, opts ChannelAccountServiceOptions) (*channelAccountService, error)
func NewFeeBumpService ¶
func NewFeeBumpService(opts FeeBumpServiceOptions) (*feeBumpService, error)
func NewIngestService ¶
func NewIngestService(cfg IngestServiceConfig) (*ingestService, error)
func NewKMSImportService ¶
func NewRPCService ¶
func NewRPCService(rpcURL, networkPassphrase string, httpClient utils.HTTPClient, metricsService metrics.MetricsService) (*rpcService, error)
func NewTransactionService ¶
func NewTransactionService(opts TransactionServiceOptions) (*transactionService, error)
Types ¶
type AccountService ¶
type AccountService interface {
// RegisterAccount registers an externally created Stellar account to be sponsored, and tracked by ingestion
RegisterAccount(ctx context.Context, address string) error
// DeregisterAccount deregisters a Stellar account, no longer sponsoring its transactions, nor tracking it on ingestion
DeregisterAccount(ctx context.Context, address string) error
}
type BackfillBatch ¶
BackfillBatch represents a contiguous range of ledgers to process as a unit.
type BackfillMode ¶
type BackfillMode int
BackfillMode indicates the purpose of backfilling.
const ( // BackfillModeHistorical fills gaps within already-ingested ledger range. BackfillModeHistorical BackfillMode = iota // BackfillModeCatchup fills forward gaps to catch up to network tip. BackfillModeCatchup )
type BackfillResult ¶
type BackfillResult struct {
Batch BackfillBatch
LedgersCount int
Duration time.Duration
Error error
BatchChanges *BatchChanges // Only populated for catchup mode
}
BackfillResult tracks the outcome of processing a single batch.
type BatchChanges ¶
type BatchChanges struct {
TrustlineChangesByKey map[indexer.TrustlineChangeKey]types.TrustlineChange
ContractChanges []types.ContractChange
AccountChangesByAccountID map[string]types.AccountChange
SACBalanceChangesByKey map[indexer.SACBalanceChangeKey]types.SACBalanceChange
UniqueTrustlineAssets map[uuid.UUID]data.TrustlineAsset
UniqueContractTokensByID map[string]types.ContractType
SACContractsByID map[string]*data.Contract // SAC contract metadata extracted from instance entries
}
BatchChanges holds data collected from a backfill batch for catchup mode. This data is processed after all parallel batches complete to ensure proper ordering.
type ChannelAccSigner ¶
type ChannelAccSigner func(ctx context.Context, tx *txnbuild.Transaction) (*txnbuild.Transaction, error)
type ChannelAccountService ¶
type ChannelAccountServiceOptions ¶
type ChannelAccountServiceOptions struct {
DB db.ConnectionPool
RPCService RPCService
BaseFee int64
DistributionAccountSignatureClient signing.SignatureClient
ChannelAccountSignatureClient signing.SignatureClient
ChannelAccountStore store.ChannelAccountStore
PrivateKeyEncrypter signingutils.PrivateKeyEncrypter
EncryptionPassphrase string
}
func (*ChannelAccountServiceOptions) Validate ¶
func (o *ChannelAccountServiceOptions) Validate() error
type ContractMetadata ¶
ContractMetadata holds the metadata for a SEP-41 contract token (name, symbol, decimals).
type ContractMetadataService ¶
type ContractMetadataService interface {
// FetchSep41Metadata fetches metadata for SEP-41 contracts via RPC without storing.
// Returns []*data.Contract with ID field pre-computed via DeterministicContractID.
FetchSep41Metadata(ctx context.Context, contractIDs []string) ([]*data.Contract, error)
// FetchSACMetadata fetches metadata for SAC contracts by calling name() via RPC.
// SAC name() returns "code:issuer" format (or "native" for XLM).
// Returns []*data.Contract with Code, Issuer, Name, Symbol, and Decimals=7.
FetchSACMetadata(ctx context.Context, contractIDs []string) ([]*data.Contract, error)
// FetchSingleField fetches a single contract method (name, symbol, decimals, balance, etc...) via RPC simulation.
// The args parameter allows passing arguments to the contract function (e.g., address for balance(id) function).
FetchSingleField(ctx context.Context, contractAddress, functionName string, args ...xdr.ScVal) (xdr.ScVal, error)
}
ContractMetadataService handles fetching metadata (name, symbol, decimals) for SEP-41 token contracts via RPC simulation.
func NewContractMetadataService ¶
func NewContractMetadataService( rpcService RPCService, contractModel data.ContractModelInterface, pool pond.Pool, ) (ContractMetadataService, error)
NewContractMetadataService creates a new ContractMetadataService instance.
type ContractMetadataServiceMock ¶
ContractMetadataServiceMock is a mock implementation of ContractMetadataService
func NewContractMetadataServiceMock ¶
func NewContractMetadataServiceMock(t interface {
mock.TestingT
Cleanup(func())
},
) *ContractMetadataServiceMock
NewContractMetadataServiceMock creates a new instance of ContractMetadataServiceMock.
func (*ContractMetadataServiceMock) FetchMetadata ¶
func (c *ContractMetadataServiceMock) FetchMetadata(ctx context.Context, contractTypesByID map[string]types.ContractType) ([]*data.Contract, error)
func (*ContractMetadataServiceMock) FetchSACMetadata ¶
func (*ContractMetadataServiceMock) FetchSep41Metadata ¶
type ContractValidator ¶
type ContractValidator interface {
ValidateFromContractCode(_ context.Context, _ []byte) (types.ContractType, error)
Close(_ context.Context) error
}
func NewContractValidator ¶
func NewContractValidator() ContractValidator
NewContractValidator creates a new ContractValidator with a configured wazero runtime. The runtime is initialized with custom sections enabled to extract contract specifications from WASM bytecode.
type FeeBumpService ¶
type FeeBumpServiceOptions ¶
type FeeBumpServiceOptions struct {
DistributionAccountSignatureClient signing.SignatureClient
BaseFee int64
Models *data.Models
}
func (*FeeBumpServiceOptions) Validate ¶
func (o *FeeBumpServiceOptions) Validate() error
type GetLedgersResponse ¶
type GetLedgersResponse protocol.GetLedgersResponse
type HistoryArchiveMock ¶
HistoryArchiveMock is a mock implementation of the historyarchive.ArchiveInterface
func (*HistoryArchiveMock) BucketExists ¶
func (m *HistoryArchiveMock) BucketExists(bucket historyarchive.Hash) (bool, error)
func (*HistoryArchiveMock) BucketSize ¶
func (m *HistoryArchiveMock) BucketSize(bucket historyarchive.Hash) (int64, error)
func (*HistoryArchiveMock) CategoryCheckpointExists ¶
func (m *HistoryArchiveMock) CategoryCheckpointExists(cat string, chk uint32) (bool, error)
func (*HistoryArchiveMock) GetCheckpointHAS ¶
func (m *HistoryArchiveMock) GetCheckpointHAS(chk uint32) (historyarchive.HistoryArchiveState, error)
func (*HistoryArchiveMock) GetCheckpointManager ¶
func (m *HistoryArchiveMock) GetCheckpointManager() historyarchive.CheckpointManager
func (*HistoryArchiveMock) GetLatestLedgerSequence ¶
func (m *HistoryArchiveMock) GetLatestLedgerSequence() (uint32, error)
func (*HistoryArchiveMock) GetLedgerHeader ¶
func (m *HistoryArchiveMock) GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error)
func (*HistoryArchiveMock) GetLedgers ¶
func (m *HistoryArchiveMock) GetLedgers(start, end uint32) (map[uint32]*historyarchive.Ledger, error)
func (*HistoryArchiveMock) GetPathHAS ¶
func (m *HistoryArchiveMock) GetPathHAS(path string) (historyarchive.HistoryArchiveState, error)
func (*HistoryArchiveMock) GetRootHAS ¶
func (m *HistoryArchiveMock) GetRootHAS() (historyarchive.HistoryArchiveState, error)
func (*HistoryArchiveMock) GetStats ¶
func (m *HistoryArchiveMock) GetStats() []historyarchive.ArchiveStats
func (*HistoryArchiveMock) GetXdrStream ¶
func (m *HistoryArchiveMock) GetXdrStream(pth string) (*xdr.Stream, error)
func (*HistoryArchiveMock) GetXdrStreamForHash ¶
func (m *HistoryArchiveMock) GetXdrStreamForHash(hash historyarchive.Hash) (*xdr.Stream, error)
func (*HistoryArchiveMock) ListAllBucketHashes ¶
func (m *HistoryArchiveMock) ListAllBucketHashes() (chan historyarchive.Hash, chan error)
func (*HistoryArchiveMock) ListAllBuckets ¶
func (m *HistoryArchiveMock) ListAllBuckets() (chan string, chan error)
func (*HistoryArchiveMock) ListBucket ¶
func (m *HistoryArchiveMock) ListBucket(dp historyarchive.DirPrefix) (chan string, chan error)
func (*HistoryArchiveMock) ListCategoryCheckpoints ¶
func (m *HistoryArchiveMock) ListCategoryCheckpoints(cat string, pth string) (chan uint32, chan error)
func (*HistoryArchiveMock) PutCheckpointHAS ¶
func (m *HistoryArchiveMock) PutCheckpointHAS(chk uint32, has historyarchive.HistoryArchiveState, opts *historyarchive.CommandOptions) error
func (*HistoryArchiveMock) PutPathHAS ¶
func (m *HistoryArchiveMock) PutPathHAS(path string, has historyarchive.HistoryArchiveState, opts *historyarchive.CommandOptions) error
func (*HistoryArchiveMock) PutRootHAS ¶
func (m *HistoryArchiveMock) PutRootHAS(has historyarchive.HistoryArchiveState, opts *historyarchive.CommandOptions) error
type IngestService ¶
type IngestService interface {
Run(ctx context.Context, startLedger uint32, endLedger uint32) error
// PersistLedgerData persists processed ledger data to the database in a single atomic transaction.
// This is the shared core used by both live ingestion and loadtest.
// Returns the number of transactions and operations persisted.
PersistLedgerData(ctx context.Context, ledgerSeq uint32, buffer *indexer.IndexerBuffer, cursorName string) (int, int, error)
}
type IngestServiceConfig ¶
type IngestServiceConfig struct {
// === Core ===
IngestionMode string
Models *data.Models
AppTracker apptracker.AppTracker
MetricsService metrics.MetricsService
// === Stellar Network ===
Network string
NetworkPassphrase string
Archive historyarchive.ArchiveInterface
RPCService RPCService
// === Ledger Backend ===
LedgerBackend ledgerbackend.LedgerBackend
LedgerBackendFactory LedgerBackendFactory
// === Cursors ===
LatestLedgerCursorName string
OldestLedgerCursorName string
// === Live Mode Dependencies ===
ChannelAccountStore store.ChannelAccountStore
TokenIngestionService TokenIngestionService
ContractMetadataService ContractMetadataService
// === Processing Options ===
GetLedgersLimit int
SkipTxMeta bool
SkipTxEnvelope bool
EnableParticipantFiltering bool
// === Backfill Tuning ===
BackfillWorkers int
BackfillBatchSize int
BackfillDBInsertBatchSize int
CatchupThreshold int
}
IngestServiceConfig holds the configuration for creating an IngestService.
type KMSImportService ¶
type LedgerBackendFactory ¶
type LedgerBackendFactory func(ctx context.Context) (ledgerbackend.LedgerBackend, error)
LedgerBackendFactory creates new LedgerBackend instances for parallel batch processing. Each batch needs its own backend because LedgerBackend is not thread-safe.
type LedgerBackendMock ¶
LedgerBackendMock is a mock implementation of the ledgerbackend.LedgerBackend interface
func (*LedgerBackendMock) Close ¶
func (l *LedgerBackendMock) Close() error
func (*LedgerBackendMock) GetLatestLedgerSequence ¶
func (l *LedgerBackendMock) GetLatestLedgerSequence(ctx context.Context) (uint32, error)
func (*LedgerBackendMock) GetLedger ¶
func (l *LedgerBackendMock) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error)
func (*LedgerBackendMock) IsPrepared ¶
func (l *LedgerBackendMock) IsPrepared(ctx context.Context, ledgerRange ledgerbackend.Range) (bool, error)
func (*LedgerBackendMock) PrepareRange ¶
func (l *LedgerBackendMock) PrepareRange(ctx context.Context, ledgerRange ledgerbackend.Range) error
type RPCService ¶
type RPCService interface {
GetTransaction(transactionHash string) (entities.RPCGetTransactionResult, error)
GetTransactions(startLedger int64, startCursor string, limit int) (entities.RPCGetTransactionsResult, error)
SendTransaction(transactionXDR string) (entities.RPCSendTransactionResult, error)
GetHealth() (entities.RPCGetHealthResult, error)
GetLedgers(startLedger uint32, limit uint32) (GetLedgersResponse, error)
GetLedgerEntries(keys []string) (entities.RPCGetLedgerEntriesResult, error)
GetAccountLedgerSequence(address string) (int64, error)
GetHeartbeatChannel() chan entities.RPCGetHealthResult
// TrackRPCServiceHealth continuously monitors the health of the RPC service and updates metrics.
// It runs health checks at regular intervals and can be triggered on-demand via immediateHealthCheckTrigger.
//
// The immediateHealthCheckTrigger channel allows external components to request an immediate health check,
// which is particularly useful when the ingestor needs to catch up with the RPC service.
//
// Returns an error if the context is cancelled. The caller is responsible for handling shutdown signals.
TrackRPCServiceHealth(ctx context.Context, immediateHealthCheckTrigger <-chan any) error
SimulateTransaction(transactionXDR string, resourceConfig entities.RPCResourceConfig) (entities.RPCSimulateTransactionResult, error)
NetworkPassphrase() string
}
type RPCServiceMock ¶
func NewRPCServiceMock ¶
func NewRPCServiceMock(t interface {
mock.TestingT
Cleanup(func())
},
) *RPCServiceMock
NewRPCServiceMock creates a new instance of RPCServiceMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*RPCServiceMock) GetAccountLedgerSequence ¶
func (r *RPCServiceMock) GetAccountLedgerSequence(address string) (int64, error)
func (*RPCServiceMock) GetHealth ¶
func (r *RPCServiceMock) GetHealth() (entities.RPCGetHealthResult, error)
func (*RPCServiceMock) GetHeartbeatChannel ¶
func (r *RPCServiceMock) GetHeartbeatChannel() chan entities.RPCGetHealthResult
func (*RPCServiceMock) GetLedgerEntries ¶
func (r *RPCServiceMock) GetLedgerEntries(keys []string) (entities.RPCGetLedgerEntriesResult, error)
func (*RPCServiceMock) GetLedgers ¶
func (r *RPCServiceMock) GetLedgers(startLedger uint32, limit uint32) (GetLedgersResponse, error)
func (*RPCServiceMock) GetTransaction ¶
func (r *RPCServiceMock) GetTransaction(transactionHash string) (entities.RPCGetTransactionResult, error)
func (*RPCServiceMock) GetTransactions ¶
func (r *RPCServiceMock) GetTransactions(startLedger int64, startCursor string, limit int) (entities.RPCGetTransactionsResult, error)
func (*RPCServiceMock) NetworkPassphrase ¶
func (r *RPCServiceMock) NetworkPassphrase() string
func (*RPCServiceMock) SendTransaction ¶
func (r *RPCServiceMock) SendTransaction(transactionXdr string) (entities.RPCSendTransactionResult, error)
func (*RPCServiceMock) SimulateTransaction ¶
func (r *RPCServiceMock) SimulateTransaction(transactionXDR string, resourceConfig entities.RPCResourceConfig) (entities.RPCSimulateTransactionResult, error)
func (*RPCServiceMock) TrackRPCServiceHealth ¶
func (r *RPCServiceMock) TrackRPCServiceHealth(ctx context.Context, triggerHeartbeat <-chan any) error
type TokenIngestionService ¶
type TokenIngestionService interface {
// PopulateAccountTokens performs initial PostgreSQL cache population from Stellar
// history archive for a specific checkpoint. It extracts all trustlines and contract
// tokens from checkpoint ledger entries and stores them in PostgreSQL.
// The checkpointLedger parameter specifies which checkpoint to use for population.
// The initializeCursors callback is invoked within the same DB transaction as
// the metadata storage to ensure atomic initialization.
PopulateAccountTokens(ctx context.Context, checkpointLedger uint32, initializeCursors func(pgx.Tx) error) error
// ProcessTokenChanges applies trustline, contract, and SAC balance changes to PostgreSQL.
// This is called by the indexer for each ledger's state changes during live ingestion.
//
// Storage semantics differ between token types:
// - Trustlines: Can be added or removed. When all trustlines for an account are removed,
// the account's entry is deleted from PostgreSQL.
// - Contracts: Only SEP-41 contracts are tracked (contracts accumulate). Unknown contracts
// are skipped. Contract balance entries persist in the ledger even when balance is zero,
// so we track all contracts an account has ever held a balance in.
// - SAC Balances: For contract addresses (C...) only. Stores absolute balance values with
// authorized and clawback flags. G-addresses use trustlines for SAC balances.
//
// Both trustline and contract IDs are computed using deterministic hash functions (DeterministicAssetID, DeterministicContractID).
ProcessTokenChanges(ctx context.Context, dbTx pgx.Tx, trustlineChangesByTrustlineKey map[indexer.TrustlineChangeKey]types.TrustlineChange, contractChanges []types.ContractChange, accountChangesByAccountID map[string]types.AccountChange, sacBalanceChangesByKey map[indexer.SACBalanceChangeKey]types.SACBalanceChange) error
}
TokenIngestionService provides write access to account token storage during ingestion.
func NewTokenIngestionService ¶
func NewTokenIngestionService( dbPool db.ConnectionPool, networkPassphrase string, archive historyarchive.ArchiveInterface, contractValidator ContractValidator, contractMetadataService ContractMetadataService, trustlineAssetModel wbdata.TrustlineAssetModelInterface, trustlineBalanceModel wbdata.TrustlineBalanceModelInterface, nativeBalanceModel wbdata.NativeBalanceModelInterface, sacBalanceModel wbdata.SACBalanceModelInterface, accountContractTokensModel wbdata.AccountContractTokensModelInterface, contractModel wbdata.ContractModelInterface, ) TokenIngestionService
NewTokenIngestionService creates a TokenIngestionService for ingestion.
func NewTokenIngestionServiceForLoadtest ¶
func NewTokenIngestionServiceForLoadtest( dbPool db.ConnectionPool, networkPassphrase string, trustlineBalanceModel wbdata.TrustlineBalanceModelInterface, nativeBalanceModel wbdata.NativeBalanceModelInterface, sacBalanceModel wbdata.SACBalanceModelInterface, accountContractTokensModel wbdata.AccountContractTokensModelInterface, ) TokenIngestionService
NewTokenIngestionServiceForLoadtest creates a minimal TokenIngestionService that only supports ProcessTokenChanges (not PopulateAccountTokens). This is used by the loadtest runner which doesn't need archive/validator/metadata services.
type TokenIngestionServiceMock ¶
TokenIngestionServiceMock is a mock implementation of the TokenIngestionService interface
func NewTokenIngestionServiceMock ¶
func NewTokenIngestionServiceMock(t interface {
mock.TestingT
Cleanup(func())
},
) *TokenIngestionServiceMock
NewTokenIngestionServiceMock creates a new instance of TokenIngestionServiceMock.
func (*TokenIngestionServiceMock) PopulateAccountTokens ¶
func (*TokenIngestionServiceMock) ProcessTokenChanges ¶
func (m *TokenIngestionServiceMock) ProcessTokenChanges(ctx context.Context, dbTx pgx.Tx, trustlineChangesByTrustlineKey map[indexer.TrustlineChangeKey]types.TrustlineChange, contractChanges []types.ContractChange, accountChangesByAccountID map[string]types.AccountChange, sacBalanceChangesByKey map[indexer.SACBalanceChangeKey]types.SACBalanceChange) error
type TransactionService ¶
type TransactionService interface {
NetworkPassphrase() string
BuildAndSignTransactionWithChannelAccount(ctx context.Context, transaction *txnbuild.GenericTransaction, simulationResult *entities.RPCSimulateTransactionResult) (*txnbuild.Transaction, error)
}
type TransactionServiceOptions ¶
type TransactionServiceOptions struct {
DB db.ConnectionPool
DistributionAccountSignatureClient signing.SignatureClient
ChannelAccountSignatureClient signing.SignatureClient
ChannelAccountStore store.ChannelAccountStore
RPCService RPCService
BaseFee int64
}
func (*TransactionServiceOptions) ValidateOptions ¶
func (o *TransactionServiceOptions) ValidateOptions() error