Documentation
¶
Overview ¶
Package natsjs provides the nats kvstore client for go-orb.
Index ¶
- Constants
- func Provide(configData map[string]any, logger log.Logger, opts ...kvstore.Option) (kvstore.Type, error)
- func WithAllowReconnect(allowReconnect bool) kvstore.Option
- func WithBucketDescription(description string) kvstore.Option
- func WithBucketPerTable(bucketPerTable bool) kvstore.Option
- func WithCompression(compression bool) kvstore.Option
- func WithDrainTimeout(drainTimeout time.Duration) kvstore.Option
- func WithFlusherTimeout(flusherTimeout time.Duration) kvstore.Option
- func WithIgnoreAuthErrorAbort(ignoreAuthErrorAbort bool) kvstore.Option
- func WithInboxPrefix(inboxPrefix string) kvstore.Option
- func WithJSONKeyValues(jsonKeyValues bool) kvstore.Option
- func WithKeyEncoding(keyEncoding string) kvstore.Option
- func WithMaxPingsOut(maxPingsOut int) kvstore.Option
- func WithMaxReconnect(maxReconnect int) kvstore.Option
- func WithNkey(nkey string) kvstore.Option
- func WithNoCallbacksAfterClientClose(noCallbacksAfterClientClose bool) kvstore.Option
- func WithNoRandomize(noRandomize bool) kvstore.Option
- func WithPassword(password string) kvstore.Option
- func WithPingInterval(pingInterval time.Duration) kvstore.Option
- func WithProxyPath(proxyPath string) kvstore.Option
- func WithReconnectBufSize(reconnectBufSize int) kvstore.Option
- func WithReconnectJitter(reconnectJitter time.Duration) kvstore.Option
- func WithReconnectJitterTLS(reconnectJitterTLS time.Duration) kvstore.Option
- func WithReconnectWait(reconnectWait time.Duration) kvstore.Option
- func WithRetryOnFailedConnect(retryOnFailedConnect bool) kvstore.Option
- func WithServers(servers []string) kvstore.Option
- func WithSkipHostLookup(skipHostLookup bool) kvstore.Option
- func WithSubChanLen(subChanLen int) kvstore.Option
- func WithTimeout(timeout time.Duration) kvstore.Option
- func WithToken(token string) kvstore.Option
- func WithURL(url string) kvstore.Option
- func WithUseOldRequestStyle(useOldRequestStyle bool) kvstore.Option
- func WithUser(user string) kvstore.Option
- type Config
- type NatsJS
- func (n *NatsJS) Delete(key string, opts ...kvstore.DeleteOption) error
- func (n *NatsJS) DropDatabase(ctx context.Context, database string) error
- func (n *NatsJS) DropTable(ctx context.Context, database, table string) error
- func (n *NatsJS) Get(ctx context.Context, key, database, table string, _ ...kvstore.GetOption) ([]kvstore.Record, error)
- func (n *NatsJS) Keys(ctx context.Context, database, table string, opts ...kvstore.KeysOption) ([]string, error)
- func (n *NatsJS) List(opts ...kvstore.ListOption) ([]string, error)
- func (n *NatsJS) Purge(ctx context.Context, key, database, table string) error
- func (n *NatsJS) Read(key string, opts ...kvstore.ReadOption) ([]*kvstore.Record, error)
- func (n *NatsJS) Set(ctx context.Context, key, database, table string, data []byte, ...) error
- func (n *NatsJS) Start(ctx context.Context) error
- func (n *NatsJS) Stop(_ context.Context) error
- func (n *NatsJS) String() string
- func (n *NatsJS) Type() string
- func (n *NatsJS) Watch(ctx context.Context, database, table string, opts ...kvstore.WatchOption) (<-chan kvstore.WatchEvent, func() error, error)
- func (n *NatsJS) Write(r *kvstore.Record, opts ...kvstore.WriteOption) error
- type NatsOptions
Constants ¶
const ( DefaultBucketDescription = "KeyValue storage administered by go-orb" DefaultDatabase = "default" DefaultTable = "" DefaultKeyEncoding = "base32" DefaultBucketPerTable = true DefaultJSONKeyValues = false )
Defaults.
const Name = "natsjs"
Name provides the name of this kvstore client.
Variables ¶
This section is empty.
Functions ¶
func Provide ¶
func Provide( configData map[string]any, logger log.Logger, opts ...kvstore.Option, ) (kvstore.Type, error)
Provide creates a new NatsJS KVStore client.
func WithAllowReconnect ¶
WithAllowReconnect enables reconnection logic when disconnected from the server.
func WithBucketDescription ¶
WithBucketDescription sets the description for the default bucket.
func WithBucketPerTable ¶
WithBucketPerTable configures whether a separate bucket is created for each table.
func WithCompression ¶
WithCompression enables compression for websocket connections.
func WithDrainTimeout ¶
WithDrainTimeout sets the timeout for a Drain Operation to complete.
func WithFlusherTimeout ¶
WithFlusherTimeout sets the maximum time to wait for write operations to complete.
func WithIgnoreAuthErrorAbort ¶
WithIgnoreAuthErrorAbort opts out of aborting reconnect attempts on repeated auth errors.
func WithInboxPrefix ¶
WithInboxPrefix allows customizing the default _INBOX prefix.
func WithJSONKeyValues ¶
WithJSONKeyValues configures whether to store key values as JSON.
func WithKeyEncoding ¶
WithKeyEncoding sets the encoding used for keys, set to empty string for no encoding.
func WithMaxPingsOut ¶
WithMaxPingsOut sets the maximum number of pending ping commands before raising an error.
func WithMaxReconnect ¶
WithMaxReconnect sets the number of reconnect attempts before giving up.
func WithNoCallbacksAfterClientClose ¶
WithNoCallbacksAfterClientClose prevents callbacks after Close() is called.
func WithNoRandomize ¶
WithNoRandomize configures whether to randomize the server pool.
func WithPassword ¶
WithPassword sets the password for authentication.
func WithPingInterval ¶
WithPingInterval sets the period for sending ping commands to the server.
func WithProxyPath ¶
WithProxyPath adds a path to connections URL for websocket connections.
func WithReconnectBufSize ¶
WithReconnectBufSize sets the size of the backing bufio during reconnect.
func WithReconnectJitter ¶
WithReconnectJitter sets the upper bound for random delay added to ReconnectWait.
func WithReconnectJitterTLS ¶
WithReconnectJitterTLS sets the upper bound for random delay added to ReconnectWait when TLS is used.
func WithReconnectWait ¶
WithReconnectWait sets the time to backoff after attempting a reconnect.
func WithRetryOnFailedConnect ¶
WithRetryOnFailedConnect sets the connection in reconnecting state if it can't connect initially.
func WithServers ¶
WithServers sets the list of NATS servers to connect to.
func WithSkipHostLookup ¶
WithSkipHostLookup skips the DNS lookup for the server hostname.
func WithSubChanLen ¶
WithSubChanLen sets the size of the buffered channel used for SyncSubscriptions.
func WithTimeout ¶
WithTimeout sets the timeout for a Dial operation on a connection.
func WithUseOldRequestStyle ¶
WithUseOldRequestStyle forces the old method of Requests.
Types ¶
type Config ¶
type Config struct {
kvstore.Config `yaml:",inline"`
NatsOptions `yaml:",inline"`
// BucketDescription configures the description for the each bucket.
// Default: "KeyValue storage administered by go-orb"
BucketDescription string `json:"bucketDescription,omitempty" yaml:"bucketDescription,omitempty"`
// KeyEncoding configures the encoding used for keys, set it to an empty string for no encoding.
// Default: base32
KeyEncoding string `json:"keyEncoding,omitempty" yaml:"keyEncoding,omitempty"`
// BucketPerTable configures whether a separate bucket is created for each table.
// If false, all tables are stored in the same bucket.
// Default: true
// Deprecated: Disable this only if you need backwards compatibility.
BucketPerTable bool `json:"bucketPerTable,omitempty" yaml:"bucketPerTable,omitempty"`
// JSONKeyValues configures whether the key values are encoded again as JSON.
// Default: false
// Deprecated: Enable this only if you need backwards compatibility.
JSONKeyValues bool `json:"jsonKeyValues,omitempty" yaml:"jsonKeyValues,omitempty"`
}
Config provides configuration for the NATS registry.
func (*Config) ApplyOptions ¶
ApplyOptions applies a set of options to the config.
type NatsJS ¶
type NatsJS struct {
// contains filtered or unexported fields
}
NatsJS implements the kvstore.KVStore interface using NATS JetStream.
func New ¶
New creates a new NATS JetStream KVStore. This function should rarely be called manually. To create a new KVStore use Provide.
func (*NatsJS) Delete ¶
func (n *NatsJS) Delete(key string, opts ...kvstore.DeleteOption) error
Delete removes the record with the corresponding key from the store. Deprecated: use Remove instead.
func (*NatsJS) DropDatabase ¶
DropDatabase drops the database.
func (*NatsJS) Get ¶
func (n *NatsJS) Get(ctx context.Context, key, database, table string, _ ...kvstore.GetOption) ([]kvstore.Record, error)
Get takes a key, database, table and optional GetOptions. It returns the Record or an error.
func (*NatsJS) Keys ¶
func (n *NatsJS) Keys(ctx context.Context, database, table string, opts ...kvstore.KeysOption) ([]string, error)
Keys returns any keys that match, or an empty list with no error if none matched.
func (*NatsJS) List ¶
func (n *NatsJS) List(opts ...kvstore.ListOption) ([]string, error)
List returns any keys that match, or an empty list with no error if none matched. Deprecated: use Keys instead.
func (*NatsJS) Read ¶
Read takes a single key and optional ReadOptions. It returns matching []*Record or an error. Deprecated: use Get instead.
func (*NatsJS) Set ¶
func (n *NatsJS) Set(ctx context.Context, key, database, table string, data []byte, _ ...kvstore.SetOption) error
Set takes a key, database, table and data, and optional SetOptions.
type NatsOptions ¶
type NatsOptions struct {
// URL represents a single NATS server url to which the client
// will be connecting. If the Servers option is also set, it
// then becomes the first server in the Servers array.
URL string `json:"url,omitempty" yaml:"url,omitempty"`
// InProcessServer represents a NATS server running within the
// same process. If this is set then we will attempt to connect
// to the server directly rather than using external TCP conns.
InProcessServer nats.InProcessConnProvider `json:"-" yaml:"-"`
// Servers is a configured set of servers which this client
// will use when attempting to connect.
Servers []string `json:"servers,omitempty" yaml:"servers,omitempty"`
// NoRandomize configures whether we will randomize the
// server pool.
NoRandomize bool `json:"noRandomize,omitempty" yaml:"noRandomize,omitempty"`
// AllowReconnect enables reconnection logic to be used when we
// encounter a disconnect from the current server.
AllowReconnect bool `json:"allowReconnect,omitempty" yaml:"allowReconnect,omitempty"`
// MaxReconnect sets the number of reconnect attempts that will be
// tried before giving up. If negative, then it will never give up
// trying to reconnect.
// Defaults to 60.
MaxReconnect int `json:"maxReconnect,omitempty" yaml:"maxReconnect,omitempty"`
// ReconnectWait sets the time to backoff after attempting a reconnect
// to a server that we were already connected to previously.
// Defaults to 2s.
ReconnectWait config.Duration `json:"reconnectWait,omitempty" yaml:"reconnectWait,omitempty"`
// CustomReconnectDelayCB is invoked after the library tried every
// URL in the server list and failed to reconnect. It passes to the
// user the current number of attempts. This function returns the
// amount of time the library will sleep before attempting to reconnect
// again. It is strongly recommended that this value contains some
// jitter to prevent all connections to attempt reconnecting at the same time.
CustomReconnectDelayCB nats.ReconnectDelayHandler `json:"-" yaml:"-"`
// ReconnectJitter sets the upper bound for a random delay added to
// ReconnectWait during a reconnect when no TLS is used.
// Defaults to 100ms.
ReconnectJitter config.Duration `json:"reconnectJitter,omitempty" yaml:"reconnectJitter,omitempty"`
// ReconnectJitterTLS sets the upper bound for a random delay added to
// ReconnectWait during a reconnect when TLS is used.
// Defaults to 1s.
ReconnectJitterTLS config.Duration `json:"reconnectJitterTls,omitempty" yaml:"reconnectJitterTls,omitempty"`
// Timeout sets the timeout for a Dial operation on a connection.
// Defaults to 2s.
Timeout config.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`
// DrainTimeout sets the timeout for a Drain Operation to complete.
// Defaults to 30s.
DrainTimeout config.Duration `json:"drainTimeout,omitempty" yaml:"drainTimeout,omitempty"`
// FlusherTimeout is the maximum time to wait for write operations
// to the underlying connection to complete (including the flusher loop).
// Defaults to 1m.
FlusherTimeout config.Duration `json:"flusherTimeout,omitempty" yaml:"flusherTimeout,omitempty"`
// PingInterval is the period at which the client will be sending ping
// commands to the server, disabled if 0 or negative.
// Defaults to 2m.
PingInterval config.Duration `json:"pingInterval,omitempty" yaml:"pingInterval,omitempty"`
// MaxPingsOut is the maximum number of pending ping commands that can
// be awaiting a response before raising an ErrStaleConnection error.
// Defaults to 2.
MaxPingsOut int `json:"maxPingsOut,omitempty" yaml:"maxPingsOut,omitempty"`
// ClosedCB sets the closed handler that is called when a client will
// no longer be connected.
ClosedCB nats.ConnHandler `json:"-" yaml:"-"`
// DisconnectedErrCB sets the disconnected error handler that is called
// whenever the connection is disconnected.
// Disconnected error could be nil, for instance when user explicitly closes the connection.
// DisconnectedCB will not be called if DisconnectedErrCB is set
DisconnectedErrCB nats.ConnErrHandler `json:"-" yaml:"-"`
// ConnectedCB sets the connected handler called when the initial connection
// is established. It is not invoked on successful reconnects - for reconnections,
// use ReconnectedCB. ConnectedCB can be used in conjunction with RetryOnFailedConnect
// to detect whether the initial connect was successful.
ConnectedCB nats.ConnHandler `json:"-" yaml:"-"`
// ReconnectedCB sets the reconnected handler called whenever
// the connection is successfully reconnected.
ReconnectedCB nats.ConnHandler `json:"-" yaml:"-"`
// DiscoveredServersCB sets the callback that is invoked whenever a new
// server has joined the cluster.
DiscoveredServersCB nats.ConnHandler `json:"-" yaml:"-"`
// AsyncErrorCB sets the async error handler (e.g. slow consumer errors)
AsyncErrorCB nats.ErrHandler `json:"-" yaml:"-"`
// ReconnectBufSize is the size of the backing bufio during reconnect.
// Once this has been exhausted publish operations will return an error.
// Defaults to 8388608 bytes (8MB).
ReconnectBufSize int `json:"reconnectBufSize,omitempty" yaml:"reconnectBufSize,omitempty"`
// SubChanLen is the size of the buffered channel used between the socket
// Go routine and the message delivery for SyncSubscriptions.
// NOTE: This does not affect AsyncSubscriptions which are
// dictated by PendingLimits()
// Defaults to 65536.
SubChanLen int `json:"subChanLen,omitempty" yaml:"subChanLen,omitempty"`
// UserJWT sets the callback handler that will fetch a user's JWT.
UserJWT nats.UserJWTHandler `json:"-" yaml:"-"`
// Nkey sets the public nkey that will be used to authenticate
// when connecting to the server. UserJWT and Nkey are mutually exclusive
// and if defined, UserJWT will take precedence.
Nkey string `json:"nkey,omitempty" yaml:"nkey,omitempty"`
// SignatureCB designates the function used to sign the nonce
// presented from the server.
SignatureCB nats.SignatureHandler `json:"-" yaml:"-"`
// User sets the username to be used when connecting to the server.
User string `json:"user,omitempty" yaml:"user,omitempty"`
// Password sets the password to be used when connecting to a server.
Password string `json:"password,omitempty" yaml:"password,omitempty"`
// Token sets the token to be used when connecting to a server.
Token string `json:"token,omitempty" yaml:"token,omitempty"`
// TokenHandler designates the function used to generate the token to be used when connecting to a server.
TokenHandler nats.AuthTokenHandler `json:"-" yaml:"-"`
// CustomDialer allows to specify a custom dialer (not necessarily
// a *net.Dialer).
CustomDialer nats.CustomDialer `json:"-" yaml:"-"`
// UseOldRequestStyle forces the old method of Requests that utilize
// a new Inbox and a new Subscription for each request.
UseOldRequestStyle bool `json:"useOldRequestStyle,omitempty" yaml:"useOldRequestStyle,omitempty"`
// NoCallbacksAfterClientClose allows preventing the invocation of
// callbacks after Close() is called. Client won't receive notifications
// when Close is invoked by user code. Default is to invoke the callbacks.
NoCallbacksAfterClientClose bool `json:"noCallbacksAfterClientClose,omitempty" yaml:"noCallbacksAfterClientClose,omitempty"`
// LameDuckModeHandler sets the callback to invoke when the server notifies
// the connection that it entered lame duck mode, that is, going to
// gradually disconnect all its connections before shutting down. This is
// often used in deployments when upgrading NATS Servers.
LameDuckModeHandler nats.ConnHandler `json:"-" yaml:"-"`
// RetryOnFailedConnect sets the connection in reconnecting state right
// away if it can't connect to a server in the initial set. The
// MaxReconnect and ReconnectWait options are used for this process,
// similarly to when an established connection is disconnected.
// If a ReconnectHandler is set, it will be invoked on the first
// successful reconnect attempt (if the initial connect fails),
// and if a ClosedHandler is set, it will be invoked if
// it fails to connect (after exhausting the MaxReconnect attempts).
RetryOnFailedConnect bool `json:"retryOnFailedConnect,omitempty" yaml:"retryOnFailedConnect,omitempty"`
// For websocket connections, indicates to the server that the connection
// supports compression. If the server does too, then data will be compressed.
Compression bool `json:"compression,omitempty" yaml:"compression,omitempty"`
// For websocket connections, adds a path to connections url.
// This is useful when connecting to NATS behind a proxy.
ProxyPath string `json:"proxyPath,omitempty" yaml:"proxyPath,omitempty"`
// InboxPrefix allows the default _INBOX prefix to be customized
InboxPrefix string `json:"inboxPrefix,omitempty" yaml:"inboxPrefix,omitempty"`
// IgnoreAuthErrorAbort - if set to true, client opts out of the default connect behavior of aborting
// subsequent reconnect attempts if server returns the same auth error twice (regardless of reconnect policy).
IgnoreAuthErrorAbort bool `json:"ignoreAuthErrorAbort,omitempty" yaml:"ignoreAuthErrorAbort,omitempty"`
// SkipHostLookup skips the DNS lookup for the server hostname.
SkipHostLookup bool `json:"skipHostLookup,omitempty" yaml:"skipHostLookup,omitempty"`
}
NatsOptions can be used to create a customized connection.
func (NatsOptions) ToOptions ¶
func (o NatsOptions) ToOptions() nats.Options
ToOptions converts the NatsOptions to nats.Options.