Documentation
¶
Overview ¶
Package beanstalk provides a beanstalk client.
Producer ¶
The Producer is used to put jobs into tubes. It provides a connection pool:
producer, err := beanstalk.NewProducer([]string{"localhost:11300"}, beanstalk.Config{
// Multiply the list of URIs to create a larger pool of connections.
Multiply: 3,
})
if err != nil {
// handle error
}
defer producer.Stop()
Putting a job in a tube is done by calling Put, which will select a random connection for its operation:
// Create the put parameters. These can be reused between Put calls.
params := beanstalk.PutParams{Priority: 1024, Delay: 0, TTR: 30 * time.Second}
// Put the "Hello World" message in the "mytube" tube.
id, err := producer.Put(ctx, "mytube", []byte("Hello World"), params)
if err != nil {
// handle error
}
If a Put operation fails on a connection, another connection in the pool will be selected for a retry.
Consumer ¶
The Consumer is used to reserve jobs from tubes. It provides a connection pool:
consumer, err := beanstalk.NewConsumer([]string{"localhost:11300"}, []string{"mytube"}, beanstalk.Config{
// Multiply the list of URIs to create a larger pool of connections.
Multiply: 3,
// NumGoroutines is the number of goroutines that the Receive method will
// spin up to process jobs concurrently.
NumGoroutines: 30,
})
if err != nil {
// handle error
}
The ratio of Multiply and NumGoroutines is important. Multiply determines the size of the connection pool and NumGoroutines determines how many reserved jobs you have in-flight. If you have a limited number of connections, but a high number of reserved jobs in-flight, your TCP connection pool might experience congestion and your processing speed will suffer. Although the ratio depends on the speed by which jobs are processed, a good rule of thumb is 1:10.
Reserve jobs from the tubes specified in NewConsumer is done by calling Receive, which will reserve jobs on any of the connections in the pool:
// Call the inline function for every job that was reserved.
consumer.Receive(ctx, func(ctx context.Context, job *beanstalk.Job) {
// handle job
if err := job.Delete(ctx); err != nil {
// handle error
}
})
If the context passed to Receive is cancelled, Receive will finish processing the jobs it has reserved before returning.
Job ¶
When Receive offers a job the goroutine is responsible for processing that job and finishing it up. A job can either be deleted, released or buried:
// Delete a job, when processing was successful. err = job.Delete(ctx) // Release a job, putting it back in the queue for another worker to pick up. err = job.Release(ctx) // Release a job, but put it back with a custom priority and a delay before // it's offered to another worker. err = job.ReleaseWithParams(ctx, 512, 5 * time.Second) // Bury a job, when it doesn't need to be processed but needs to be kept // around for manual inspection or manual requeuing. err = job.Bury(ctx)
Conn ¶
If the Producer and Consumer abstractions are too high, then Conn provides the lower level abstraction of a single connection to a beanstalk server:
conn, err := beanstalk.Dial("localhost:11300", beanstalk.Config{}))
if err != nil {
// handle error
}
defer conn.Close()
// conn.Put(...)
// conn.Watch(...)
// conn.Reserve(...)
Logging ¶
The Config structure offers hooks for info and error logs that allows hooking in to a custom log solution.
config := beanstalk.Config{
InfoFunc: func(message string) {
log.Info(message)
},
ErrorFunc: func(err error, message string) {
log.WithError(err).Error(message)
},
}
URIs ¶
NewProducer, NewConsumer and Dial take a URI or a list of URIs as their first argument, who can be described in various formats. In the above examples the beanstalk server was referenced by the host:port notation. This package also supports URI formats like beanstalk:// for a plaintext connection, and beanstalks:// or tls:// for encrypted connections.
In the case of encrypted connections, if no port has been specified it will default to port 11400 as opposed to the default 11300 port.
Index ¶
- Variables
- func ValidURIs(uris []string) error
- type Config
- type Conn
- func (conn *Conn) Close() error
- func (conn *Conn) Delete(ctx context.Context, id uint64) error
- func (conn *Conn) Ignore(ctx context.Context, tube string) error
- func (conn *Conn) Kick(ctx context.Context, tube string, bound int) (int64, error)
- func (conn *Conn) ListTubes(ctx context.Context) ([]string, error)
- func (conn *Conn) PeekBuried(ctx context.Context, tube string) (*Job, error)
- func (conn *Conn) PeekDelayed(ctx context.Context, tube string) (*Job, error)
- func (conn *Conn) Put(ctx context.Context, tube string, body []byte, params PutParams) (uint64, error)
- func (conn *Conn) Reserve(ctx context.Context) (*Job, error)
- func (conn *Conn) ReserveWithTimeout(ctx context.Context, timeout time.Duration) (*Job, error)
- func (conn *Conn) String() string
- func (conn *Conn) TubeStats(ctx context.Context, tube string) (TubeStats, error)
- func (conn *Conn) Watch(ctx context.Context, tube string) error
- type Consumer
- type Job
- func (job *Job) Bury(ctx context.Context) error
- func (job *Job) BuryWithPriority(ctx context.Context, priority uint32) error
- func (job *Job) Delete(ctx context.Context) error
- func (job *Job) Kick(ctx context.Context) error
- func (job *Job) Release(ctx context.Context) error
- func (job *Job) ReleaseWithParams(ctx context.Context, priority uint32, delay time.Duration) error
- func (job *Job) Touch(ctx context.Context) error
- func (job *Job) TouchAfter() time.Duration
- type Producer
- type PutParams
- type TubeStats
Constants ¶
This section is empty.
Variables ¶
var ( ErrBuried = errors.New("job was buried") ErrDeadlineSoon = errors.New("deadline soon") ErrDisconnected = errors.New("client disconnected") ErrDraining = errors.New("server is draining") ErrNotFound = errors.New("job not found") ErrTimedOut = errors.New("reserve timed out") ErrTooBig = errors.New("job too big") ErrNotIgnored = errors.New("tube not ignored") ErrTubeTooLong = errors.New("tube name too long") ErrUnexpected = errors.New("unexpected response received") )
These error may be returned by any of Conn's methods.
var ErrJobFinished = errors.New("job was already finished")
ErrJobFinished is returned when a job was already finished.
Functions ¶
Types ¶
type Config ¶ added in v1.3.0
type Config struct {
// Multiply the list of URIs to create a larger pool of connections.
//
// The default is to have 1.
Multiply int
// NumGoroutines is the number of goroutines that the Receive method will
// spin up to process jobs concurrently.
//
// The default is to spin up 10 goroutines.
NumGoroutines int
// ConnTimeout configures the read and write timeout of the connection. This
// can be overridden by a context deadline if its value is lower.
//
// Note that this does not work with Reserve() and might interfere with
// ReserveWithTimeout() if configured incorrectly.
//
// The default is to have no timeout.
ConnTimeout time.Duration
// ReserveTimeout is the time a consumer connection waits between reserve
// attempts if the last attempt failed to reserve a job.
//
// The default is to wait 1 seconds.
ReserveTimeout time.Duration
// ReconnectTimeout is the timeout between reconnects.
//
// The default is to wait 3 seconds.
ReconnectTimeout time.Duration
// TLSConfig describes the configuration that is used when Dial() makes a
// TLS connection.
TLSConfig *tls.Config
// InfoFunc is called to log informational messages.
InfoFunc func(message string)
// ErrorFunc is called to log error messages.
ErrorFunc func(err error, message string)
// IgnoreURIValidation skips the step of calling ValidURIs() method during initialization
IgnoreURIValidation bool
}
Config is used to configure a Consumer, Producer or Conn.
type Conn ¶ added in v1.3.0
type Conn struct {
URI string
// contains filtered or unexported fields
}
Conn describes a connection to a beanstalk server.
func (*Conn) Kick ¶ added in v1.3.3
Kick one or more jobs in the specified tube. This function returns the number of jobs that were kicked.
func (*Conn) PeekBuried ¶ added in v1.3.3
PeekBuried peeks at a buried job on the specified tube and returns the job. If there are no jobs to peek at, this function will return without a job or error.
func (*Conn) PeekDelayed ¶ added in v1.4.2
PeekDelayed peeks at a delayed job on the specified tube and returns the job. If there are no jobs to peek at, this function will return without a job or error.
func (*Conn) Put ¶ added in v1.3.0
func (conn *Conn) Put(ctx context.Context, tube string, body []byte, params PutParams) (uint64, error)
Put a job in the specified tube.
func (*Conn) ReserveWithTimeout ¶ added in v1.3.0
ReserveWithTimeout tries to reserve a job and block for up to a maximum of timeout. If no job could be reserved, this function will return without a job or error.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer maintains a pool of connections and allows workers to reserve jobs on those connections.
func NewConsumer ¶
NewConsumer returns a new Consumer.
type Job ¶
type Job struct {
ID uint64
Body []byte
ReservedAt time.Time
Stats struct {
PutParams `yaml:",inline"`
Tube string `yaml:"tube"`
State string `yaml:"state"`
Age time.Duration `yaml:"age"`
TimeLeft time.Duration `yaml:"time-left"`
File int `yaml:"file"`
Reserves int `yaml:"reserves"`
Timeouts int `yaml:"timeouts"`
Releases int `yaml:"releases"`
Buries int `yaml:"buries"`
Kicks int `yaml:"kicks"`
}
// contains filtered or unexported fields
}
Job describes a beanstalk job and its stats.
func (*Job) BuryWithPriority ¶
BuryWithPriority buries this job with the specified priority.
func (*Job) ReleaseWithParams ¶
ReleaseWithParams releases this job back with the specified priority and delay.
func (*Job) TouchAfter ¶ added in v1.3.0
TouchAfter returns the duration until this jobs needs to be touched for its reservation to be retained.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer maintains a pool of connections to beanstalk servers on which it inserts jobs.
func NewProducer ¶
NewProducer returns a new Producer.
func (*Producer) IsConnected ¶ added in v1.4.0
IsConnected returns true when at least one producer in the pool is connected.
type PutParams ¶
type PutParams struct {
Priority uint32 `yaml:"pri"`
Delay time.Duration `yaml:"delay"`
TTR time.Duration `yaml:"ttr"`
}
PutParams are the parameters used to perform a Put operation.
type TubeStats ¶ added in v1.2.2
type TubeStats struct {
Name string `yaml:"name"`
UrgentJobs int64 `yaml:"current-jobs-urgent"`
ReadyJobs int64 `yaml:"current-jobs-ready"`
ReservedJobs int64 `yaml:"current-jobs-reserved"`
DelayedJobs int64 `yaml:"current-jobs-delayed"`
BuriedJobs int64 `yaml:"current-jobs-buried"`
TotalJobs int64 `yaml:"total-jobs"`
CurrentUsing int64 `yaml:"current-using"`
CurrentWatching int64 `yaml:"current-watching"`
CurrentWaiting int64 `yaml:"current-waiting"`
Deletes int64 `yaml:"cmd-delete"`
Pauses int64 `yaml:"cmd-pause-tube"`
Pause time.Duration `yaml:"pause"`
PauseLeft time.Duration `yaml:"pause-time-left"`
}
TubeStats describe the statistics of a beanstalk tube.