Documentation
¶
Index ¶
- Constants
- func Contains(s []string, e string) bool
- func ConvertKafkaMessageToTimber(message *sarama.ConsumerMessage) (timber pb.Timber, err error)
- func ConvertTimberToEsDocumentString(timber pb.Timber, m *jsonpb.Marshaler) string
- func ConvertTimberToKafkaMessage(timber *pb.Timber, topic string) *sarama.ProducerMessage
- func GetApplicationSecretCollection() []string
- func InstruApplicationSecret(appSecret string)
- func NewDummyKafkaFactory() *dummyKafkaFactory
- func NewDummyRateLimiter() *dummyRateLimiter
- func NewElastic(retrierFunc *ElasticRetrier, esConfig esConfig, urls []string, ...) (client elasticClient, err error)
- func NewEsConfig(indexMethod string, bulkSize int, flushMs time.Duration, printTPS bool) esConfig
- type ApplicationSecretCollection
- type BaritoConsumerService
- type ClusterConsumer
- type ConsumerWorker
- type ELasticTestHandler
- type Elastic
- type ElasticRetrier
- type KafkaAdmin
- type KafkaFactory
- type LeakyBucket
- type ProducerService
- type RateLimiter
Constants ¶
View Source
const ( ErrConvertKafkaMessage = errkit.Error("Convert KafkaMessage Failed") ErrStore = errkit.Error("Store Failed") ErrElasticsearchClient = errkit.Error("Elasticsearch Client Failed") ErrConsumerWorker = errkit.Error("Consumer Worker Failed") ErrMakeKafkaAdmin = errkit.Error("Make kafka admin failed") ErrMakeNewTopicWorker = errkit.Error("Make new topic worker failed") ErrSpawnWorkerOnNewTopic = errkit.Error("Spawn worker on new topic failed") ErrSpawnWorker = errkit.Error("Span worker failed") ErrHaltWorker = errkit.Error("Consumer Worker Halted") PrefixEventGroupID = "nte" )
View Source
const ( JsonParseError = errkit.Error("JSON Parse Error") ProtoParseError = errkit.Error("Protobuf Parse Error") )
View Source
const ( ErrMakeSyncProducer = errkit.Error("Make sync producer failed") ErrKafkaRetryLimitReached = errkit.Error("Error connecting to kafka, retry limit reached") ErrInitGrpc = errkit.Error("Failed to listen to gRPC address") ErrRegisterGrpc = errkit.Error("Error registering gRPC server endpoint into reverse proxy") ErrReverseProxy = errkit.Error("Error serving REST reverse proxy") )
View Source
const (
DEFAULT_ELASTIC_DOCUMENT_TYPE = "_doc"
)
View Source
const (
RetrieveMessageFailedError = errkit.Error("Retrieve message failed")
)
Variables ¶
This section is empty.
Functions ¶
func ConvertKafkaMessageToTimber ¶
func ConvertKafkaMessageToTimber(message *sarama.ConsumerMessage) (timber pb.Timber, err error)
func ConvertTimberToEsDocumentString ¶ added in v0.13.0
func ConvertTimberToKafkaMessage ¶
func ConvertTimberToKafkaMessage(timber *pb.Timber, topic string) *sarama.ProducerMessage
func GetApplicationSecretCollection ¶
func GetApplicationSecretCollection() []string
func InstruApplicationSecret ¶
func InstruApplicationSecret(appSecret string)
func NewDummyKafkaFactory ¶
func NewDummyKafkaFactory() *dummyKafkaFactory
func NewDummyRateLimiter ¶
func NewDummyRateLimiter() *dummyRateLimiter
func NewElastic ¶
func NewElastic(retrierFunc *ElasticRetrier, esConfig esConfig, urls []string, elasticUsername string, elasticPassword string) (client elasticClient, err error)
Types ¶
type ApplicationSecretCollection ¶
type ApplicationSecretCollection struct {
// contains filtered or unexported fields
}
type BaritoConsumerService ¶
type BaritoConsumerService interface {
Start() error
Close()
WorkerMap() map[string]ConsumerWorker
NewTopicEventWorker() ConsumerWorker
}
func NewBaritoConsumerService ¶
func NewBaritoConsumerService(params map[string]interface{}) BaritoConsumerService
type ClusterConsumer ¶
type ClusterConsumer interface {
Messages() <-chan *sarama.ConsumerMessage
Notifications() <-chan *cluster.Notification
Errors() <-chan error
MarkOffset(msg *sarama.ConsumerMessage, metadata string)
Close() error
}
Interfacing cluser.Consumer for testing purpose
type ConsumerWorker ¶
type ConsumerWorker interface {
Start()
Stop()
Halt()
IsStart() bool
OnError(f func(error))
OnSuccess(f func(*sarama.ConsumerMessage))
OnNotification(f func(*cluster.Notification))
}
func NewConsumerWorker ¶
func NewConsumerWorker(name string, consumer ClusterConsumer) ConsumerWorker
type ELasticTestHandler ¶
type ELasticTestHandler struct {
ExistAPIStatus int
CreateAPIStatus int
PostAPIStatus int
ResponseBody []byte
CustomHandler func(w http.ResponseWriter, r *http.Request)
}
func (*ELasticTestHandler) ServeHTTP ¶
func (handler *ELasticTestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
type ElasticRetrier ¶
type ElasticRetrier struct {
// contains filtered or unexported fields
}
func NewElasticRetrier ¶
func NewElasticRetrier(t time.Duration, n int, f func(err error)) *ElasticRetrier
type KafkaAdmin ¶
type KafkaAdmin interface {
RefreshTopics() error
SetTopics([]string)
Topics() []string
AddTopic(topic string)
Exist(topic string) bool
CreateTopic(topic string, numPartitions int32, replicationFactor int16) error
Close()
}
func NewKafkaAdmin ¶
func NewKafkaAdmin(client sarama.Client) (admin KafkaAdmin, err error)
type KafkaFactory ¶
type KafkaFactory interface {
MakeKafkaAdmin() (admin KafkaAdmin, err error)
MakeClusterConsumer(groupID, topic string, initialOffset int64) (worker ClusterConsumer, err error)
MakeSyncProducer() (producer sarama.SyncProducer, err error)
}
func NewKafkaFactory ¶
func NewKafkaFactory(brokers []string, config *sarama.Config) KafkaFactory
type LeakyBucket ¶
type LeakyBucket struct {
// contains filtered or unexported fields
}
func NewLeakyBucket ¶
func NewLeakyBucket(max int32) *LeakyBucket
func (*LeakyBucket) IsFull ¶
func (b *LeakyBucket) IsFull() bool
func (*LeakyBucket) Max ¶
func (b *LeakyBucket) Max() int32
func (*LeakyBucket) Refill ¶
func (l *LeakyBucket) Refill()
func (*LeakyBucket) Take ¶
func (l *LeakyBucket) Take(count int) bool
func (*LeakyBucket) Token ¶
func (b *LeakyBucket) Token() int32
func (*LeakyBucket) UpdateMax ¶
func (b *LeakyBucket) UpdateMax(newMax int32)
type ProducerService ¶ added in v0.13.0
type ProducerService interface {
pb.ProducerServer
Start() error
LaunchREST() error
Close()
}
func NewProducerService ¶ added in v0.13.0
func NewProducerService(params map[string]interface{}) ProducerService
type RateLimiter ¶
type RateLimiter interface {
IsHitLimit(topic string, count int, maxTokenIfNotExist int32) bool
Start()
Stop()
IsStart() bool
PutBucket(topic string, bucket *LeakyBucket)
Bucket(topic string) *LeakyBucket
}
func NewRateLimiter ¶
func NewRateLimiter(duration int) RateLimiter
Source Files
¶
Click to show internal directories.
Click to hide internal directories.