cometdump

package module
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2025 License: MIT Imports: 35 Imported by: 0

README

CometDump

PkgGoDev

A Go library and CLI tool for efficiently storing and retrieving CometBFT blockchain data. CometDump downloads blocks from CometBFT-based chains and stores them in compressed chunks for fast local access.

Features

  • 🚀 Fast Block Retrieval: Efficiently fetch blocks from multiple CometBFT nodes with concurrent workers
  • 💾 Compressed Storage: Blocks are stored in compressed msgpack format using Brotli compression
  • 🔍 Smart Node Discovery: Automatically discover and select the best nodes from the network
  • Iterator Interface: Stream through blocks with Go's iterator pattern
  • 🌐 RPC Mirror: Serve stored blockchain data via CometBFT-compatible JSON-RPC endpoints
  • 🔧 Configurable Sync: Flexible sync configuration with version constraints and chunk sizing

Installation

go get github.com/ehsanranjbar/cometdump

Usage

Example

This example demonstrates syncing blocks from a CometBFT node, iterating through stored blocks, and accessing specific blocks:

package main

import (
    "context"
    "fmt"
    "log"
    "log/slog"
    "os"

    "github.com/ehsanranjbar/cometdump"
)

func main() {
    // Open or create a store directory
    store, err := cometdump.Open("./blockchain-data")
    if err != nil {
        log.Fatal(err)
    }

    // Create a logger
    logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
        Level: slog.LevelInfo,
    }))
    // Configure sync options and sync blocks from CometBFT nodes
    config := cometdump.DefaultSyncConfig("https://cosmos-rpc.publicnode.com:443/").
        WithExpandRemotes(true).                    // Discover additional nodes
        WithUseLatestVersion(true).                 // Use nodes with latest version
        WithTargetHeight(1000).                     // Sync up to block 1,000
        WithLogger(logger.With("module", "sync"))

    ctx := context.Background()
    err = store.Sync(ctx, config)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println("Sync completed!")

    // Iterate through all stored blocks using the Blocks method
    fmt.Println("\nIterating through blocks:")
    for block, err := range store.Blocks(1, 10) {
        if err != nil {
            log.Printf("Error reading block: %v", err)
            continue
        }

        fmt.Printf("Block Height: %d, Hash: %s, Txs: %d\n",
            block.Block.Height,
            block.Block.Hash(),
            len(block.Block.Data.Txs))
    }
}
RPC Server Example

CometDump includes an RPC server that provides CometBFT-compatible JSON-RPC endpoints for accessing stored blockchain data:

package main

import (
    "log"
    "net/http"
    "os"

    cometlog "github.com/cometbft/cometbft/v2/libs/log"
    rpcserver "github.com/cometbft/cometbft/v2/rpc/jsonrpc/server"
    "github.com/ehsanranjbar/cometdump"
)

func main() {
    // Open the store
    store, err := cometdump.Open("./blockchain-data")
    if err != nil {
        log.Fatal(err)
    }

    // Create RPC server and register routes
    mux := http.NewServeMux()
    server := cometdump.NewRPCServer(store)
    logger := cometlog.NewLogger(os.Stdout)
    rpcserver.RegisterRPCFuncs(mux, server.GetRoutes(), logger)

    // Start the RPC server
    listener, err := rpcserver.Listen("tcp://localhost:8080", 0)
    if err != nil {
        log.Fatal(err)
    }

    log.Println("Starting RPC server", "address", "http://localhost:8080")

    if err := rpcserver.Serve(listener, mux, logger, rpcserver.DefaultConfig()); err != nil {
        log.Fatal(err)
    }
}

The RPC server provides the following CometBFT-compatible endpoints:

  • GET /block?height=N - Get block at specific height
  • GET /block_results?height=N - Get block results (transaction results, events)
  • GET /blockchain?minHeight=N&maxHeight=M - Get block headers in range
  • GET /header?height=N - Get block header at specific height

CLI Usage

CometDump provides a powerful CLI tool for syncing, serving and accessing dump store.

CLI Example
# Sync blocks from a specific node up to height 100000
cometdump sync --remote https://rpc.cosmos.network:443 --height 100000

# Start RPC serve server on port 8080
cometdump serve --addr 0.0.0.0:8080

# Get block summary at height 12345
cometdump block --height 12345

Requirements

  • Go 1.23.5 or later
  • Compatible with CometBFT v2.x nodes

License

This project is licensed under the MIT License - see the LICENSE file for details.

Contributing

Contributions are welcome! Please feel free to submit issues, feature requests, or pull requests.

  • CometBFT - The underlying consensus engine
  • Cosmos SDK - Framework for building blockchain applications

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BlockRecord

type BlockRecord struct {
	BlockID               cmtypes.BlockID
	Block                 *cmtypes.Block
	TxResults             []*abcitypes.ExecTxResult
	FinalizeBlockEvents   []abcitypes.Event
	ValidatorUpdates      []abcitypes.ValidatorUpdate
	ConsensusParamUpdates *cmtproto.ConsensusParams
	BlockResultsAppHash   []byte
}

BlockRecord is a structure that holds the details of a block, including its ID, transactions, events, and other related data. It is used to encode and decode block data in a msgpack format.

func BlockRecordFromRPCResults

func BlockRecordFromRPCResults(block *coretypes.ResultBlock, blockResults *coretypes.ResultBlockResults) *BlockRecord

BlockRecordFromRPCResults creates a BlockRecord from the given ResultBlock and ResultBlockResults.

func (*BlockRecord) DecodeMsgpack

func (b *BlockRecord) DecodeMsgpack(dec *msgpack.Decoder) error

DecodeMsgpack decodes the Block from a msgpack format.

func (*BlockRecord) EncodeMsgpack

func (b *BlockRecord) EncodeMsgpack(enc *msgpack.Encoder) error

EncodeMsgpack encodes the Block into a msgpack format.

func (*BlockRecord) ToResultBlock

func (b *BlockRecord) ToResultBlock() *coretypes.ResultBlock

ToResultBlock converts the resultBlockProto back to a ResultBlock.

func (*BlockRecord) ToResultBlockResults

func (b *BlockRecord) ToResultBlockResults() *coretypes.ResultBlockResults

ToResultBlockResults converts the Block to a ResultBlockResults.

type Chunk added in v0.2.2

type Chunk struct {
	FromHeight int64
	ToHeight   int64
}

Chunk represents a range of block heights stored in a single chunk file.

type NoNodesAvailableForRangeError

type NoNodesAvailableForRangeError struct {
	// contains filtered or unexported fields
}

NoNodesAvailableForRangeError is returned when no nodes are available for the given height range.

func (*NoNodesAvailableForRangeError) Error

func (*NoNodesAvailableForRangeError) Range

Range returns the height range for which no nodes are available.

type OpenOptions

type OpenOptions struct {
	// Path to the store directory.
	Path string
	// Logger is the logger to use for logging during store operations.
	Logger *slog.Logger
}

OpenOptions defines options for opening a store.

func DefaultOpenOptions

func DefaultOpenOptions(path string) OpenOptions

DefaultOpenOptions provides default options for opening a store.

func (OpenOptions) WithLogger

func (o OpenOptions) WithLogger(logger *slog.Logger) OpenOptions

WithLogger sets the logger for the store operations.

type RPCServer

type RPCServer struct {
	// contains filtered or unexported fields
}

RPCServer is a struct that provides methods to handle RPC requests for the cometdump store. It contains a reference to the Store, which is used to access blockchain data.

func NewRPCServer

func NewRPCServer(store *Store) *RPCServer

NewRPCServer creates a new RPC server for the cometdump store.

func (*RPCServer) Block

func (server *RPCServer) Block(_ *rpctypes.Context, heightPtr *int64) (*coretypes.ResultBlock, error)

Block gets block at a given height. If no height is provided, it will fetch the latest block. More: https://docs.cometbft.com/main/rpc/#/Info/block

func (*RPCServer) BlockResults

func (server *RPCServer) BlockResults(_ *rpctypes.Context, heightPtr *int64) (*coretypes.ResultBlockResults, error)

BlockResults gets ABCIResults at a given height. If no height is provided, it will fetch results for the latest block.

Results are for the height of the block containing the txs. Thus response.results.deliver_tx[5] is the results of executing getBlock(h).Txs[5] More: https://docs.cometbft.com/main/rpc/#/Info/block_results

func (*RPCServer) BlockchainInfo

func (server *RPCServer) BlockchainInfo(_ *rpctypes.Context, minHeight, maxHeight int64) (*coretypes.ResultBlockchainInfo, error)

BlockchainInfo gets block headers for minHeight <= height <= maxHeight. More: https://docs.cometbft.com/main/rpc/#/Info/blockchain

func (*RPCServer) GetRoutes

func (server *RPCServer) GetRoutes() rpccore.RoutesMap

GetRoutes returns a map of RPC function names to their corresponding RPC functions. Each function is registered with its expected parameters and caching behavior. The functions handle requests for block data, block results, blockchain information, and headers.

func (*RPCServer) Header

func (server *RPCServer) Header(_ *rpctypes.Context, heightPtr *int64) (*coretypes.ResultHeader, error)

Header gets block header at a given height. If no height is provided, it will fetch the latest header. More: https://docs.cometbft.com/main/rpc/#/Info/header

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store represents a directory where blocks are stored as chunks of data.

func Open

func Open(opts OpenOptions) (*Store, error)

Open initializes a new Store at the specified path or returns an existing one. If the directory does not exist, it will be created. It also acquires a lock on the directory to prevent concurrent access.

func (*Store) BlockAt

func (s *Store) BlockAt(height int64) (*BlockRecord, error)

BlockAt retrieves a block record by its height from the store.

func (*Store) Blocks

func (s *Store) Blocks(args ...int64) iter.Seq2[*BlockRecord, error]

Blocks returns an iterator over block records in the store. It accepts optional start and end heights to limit the range of blocks returned. If no heights are provided, it defaults to the entire range of stored blocks. The iterator yields BlockRecord objects and any errors encountered during iteration.

func (*Store) Chunks added in v0.2.2

func (s *Store) Chunks() []Chunk

Chunks returns a copy of the list of chunks in the store.

func (*Store) Normalize

func (s *Store) Normalize(chunkSize int64) error

Normalize normalizes the chunks into a new chunks list of specified size.

func (*Store) Sync

func (s *Store) Sync(ctx context.Context, opts SyncOptions) error

Sync fetches blocks up until the latest block height (or a specific height if provided) and stores them in the store directory.

func (*Store) VerifyIntegrity

func (s *Store) VerifyIntegrity(checksumFile string) error

VerifyIntegrity checks the integrity of the chunks in the store by verifying their sha256 checksums.

type SyncOptions

type SyncOptions struct {
	// Remotes is a list of node RPC endpoints to connect to.
	Remotes []string
	// ExpandRemotes indicates whether to expand the remotes by querying the chain network info.
	ExpandRemotes bool
	// VersionConstraint is a semantic version constraint for app versions of the nodes.
	VersionConstraint *semver.Constraints
	// UseLatestVersion indicates whether to use the nodes with the latest application version.
	UseLatestVersion bool
	// ChunkSize is the number of blocks to put in each file/chunk.
	ChunkSize int
	// BaseHeight is the height from which to start syncing.
	// If 0, it will start from the last stored height.
	// Or if the store is empty, it will start from earliest available height.
	BaseHeight int64
	// TargetHeight is the height up to which store should be synced.
	// If 0, it will fetch up to the latest block height.
	TargetHeight int64
	// FetchSize is the number of blocks to fetch in each RPC call.
	FetchSize int
	// NumWorkers is the number of concurrent workers to fetch blocks.
	NumWorkers int
	// OutputChan is a channel that can be optionally used to receive the BlockRecords as they are stored.
	OutputChan chan<- *BlockRecord
	// ProgressBar is the progress bar to use for displaying sync progress.
	ProgressBar *mpb.Progress
	// Logger is the logger to use for logging during the sync process.
	Logger *slog.Logger
}

SyncOptions defines options for the Sync method.

func DefaultSyncOptions

func DefaultSyncOptions(remotes ...string) SyncOptions

DefaultSyncOptions provides default options for the Sync method.

func (SyncOptions) WithBaseHeight

func (opts SyncOptions) WithBaseHeight(height int64) SyncOptions

WithBaseHeight sets the height from which to start syncing.

func (SyncOptions) WithChunkSize

func (opts SyncOptions) WithChunkSize(size int) SyncOptions

WithChunkSize sets the number of blocks to put in each file/chunk.

func (SyncOptions) WithExpandRemotes

func (opts SyncOptions) WithExpandRemotes(expand bool) SyncOptions

WithExpandRemotes sets whether to expand the remotes by querying the chain network info.

func (SyncOptions) WithFetchSize

func (opts SyncOptions) WithFetchSize(size int) SyncOptions

WithFetchSize sets the number of blocks to fetch in each RPC call.

func (SyncOptions) WithLogger

func (opts SyncOptions) WithLogger(logger *slog.Logger) SyncOptions

WithLogger sets the logger for the sync operation.

func (SyncOptions) WithNumWorkers

func (opts SyncOptions) WithNumWorkers(num int) SyncOptions

WithNumWorkers sets the number of concurrent workers to fetch blocks.

func (SyncOptions) WithOutputChan

func (opts SyncOptions) WithOutputChan(outputChan chan<- *BlockRecord) SyncOptions

WithOutputChan sets the channel to which BlockRecords will be sent as they are stored. If nil, no records will be sent to a channel. The channel will be closed automatically after the sync operation is complete.

func (SyncOptions) WithProgressBar

func (opts SyncOptions) WithProgressBar(pBar *mpb.Progress) SyncOptions

WithProgressBar sets the progress bar to use for displaying sync progress.

func (SyncOptions) WithTargetHeight

func (opts SyncOptions) WithTargetHeight(height int64) SyncOptions

WithTargetHeight sets the height up to which blocks should be fetched.

func (SyncOptions) WithUseLatestVersion

func (opts SyncOptions) WithUseLatestVersion(useLatest bool) SyncOptions

WithUseLatestVersion indicates whether to use the latest version of the remote.

func (SyncOptions) WithVersionConstraint

func (opts SyncOptions) WithVersionConstraint(constraint string) SyncOptions

WithVersionConstraint sets a version constraint for the remotes.

Directories

Path Synopsis
cmd
cometdump command
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL