Documentation
¶
Index ¶
- Constants
- Variables
- func AddFilter(filter EventFilter)
- func Close()
- func GetStats() map[string]interface{}
- func HealthCheck() error
- func Publish(topic string, payload any) error
- func PublishAsyncWithContext(ctx context.Context, topic string, payload any) error
- func PublishSync(topic string, payload any) error
- func PublishSyncWithContext(ctx context.Context, topic string, payload any) error
- func PublishWithContext(ctx context.Context, topic string, payload any) error
- func ResetSingleton()
- func SetTracer(tracer EventTracer)
- func Subscribe(topic string, handler any) error
- func SubscribeWithPriority(topic string, handler any, priority int) error
- func SubscribeWithResponse(topic string, handler ResponseHandler) error
- func SubscribeWithResponseContext(topic string, handler ResponseHandlerWithContext) error
- func Unsubscribe(topic string, handler any) error
- func UnsubscribeAll(topic string) error
- func Use(middleware IMiddleware)
- func WrapError(err error, format string, args ...interface{}) error
- type CompleteMetadata
- type EventBus
- func (e *EventBus) AddFilter(filter EventFilter)
- func (e *EventBus) Close()
- func (e *EventBus) GetStats() map[string]interface{}
- func (e *EventBus) HealthCheck() error
- func (e *EventBus) NewGroup(prefix string) *TopicGroup
- func (e *EventBus) Publish(topic string, payload any) error
- func (e *EventBus) PublishAsyncWithContext(ctx context.Context, topic string, payload any) error
- func (e *EventBus) PublishSync(topic string, payload any) error
- func (e *EventBus) PublishSyncAll(topic string, payload any) (*SyncResult, error)
- func (e *EventBus) PublishSyncAllWithContext(ctx context.Context, topic string, payload any) (*SyncResult, error)
- func (e *EventBus) PublishSyncAny(topic string, payload any) (*SyncResult, error)
- func (e *EventBus) PublishSyncAnyWithContext(ctx context.Context, topic string, payload any) (*SyncResult, error)
- func (e *EventBus) PublishSyncWithContext(ctx context.Context, topic string, payload any) error
- func (e *EventBus) PublishWithContext(ctx context.Context, topic string, payload any) error
- func (e *EventBus) SetTracer(tracer EventTracer)
- func (e *EventBus) Subscribe(topic string, handler any) error
- func (e *EventBus) SubscribeWithPriority(topic string, handler any, priority int) error
- func (e *EventBus) SubscribeWithResponse(topic string, handler ResponseHandler) error
- func (e *EventBus) SubscribeWithResponseAndPriority(topic string, handler ResponseHandler, priority int) error
- func (e *EventBus) SubscribeWithResponseContext(topic string, handler ResponseHandlerWithContext) error
- func (e *EventBus) SubscribeWithResponseContextAndPriority(topic string, handler ResponseHandlerWithContext, priority int) error
- func (e *EventBus) Unsubscribe(topic string, handler any) error
- func (e *EventBus) UnsubscribeAll(topic string) error
- func (e *EventBus) Use(middleware IMiddleware)
- type EventFilter
- type EventTracer
- type FilterFunc
- type Handler
- type HandlerInfo
- type HandlerResult
- type HandlerWithPriority
- type IMiddleware
- type LatencyStats
- type MetricsTracer
- func (m *MetricsTracer) GetLatencyMetrics() map[string]map[string]interface{}
- func (m *MetricsTracer) GetMetrics() map[string]interface{}
- func (m *MetricsTracer) GetQueueMetrics() map[string]map[string]int64
- func (m *MetricsTracer) OnComplete(topic string, metadata CompleteMetadata)
- func (m *MetricsTracer) OnError(topic string, err error)
- func (m *MetricsTracer) OnPublish(topic string, payload any, metadata PublishMetadata)
- func (m *MetricsTracer) OnQueueFull(topic string, size int)
- func (m *MetricsTracer) OnSlowConsumer(topic string, latency time.Duration)
- func (m *MetricsTracer) OnSubscribe(topic string, handler any)
- func (m *MetricsTracer) OnUnsubscribe(topic string, handler any)
- type Middleware
- type Pipe
- func (p *Pipe[T]) Close()
- func (p *Pipe[T]) GetStats() map[string]interface{}
- func (p *Pipe[T]) Publish(payload T) error
- func (p *Pipe[T]) PublishSync(payload T) error
- func (p *Pipe[T]) PublishSyncAll(payload T) (*PipeSyncResult, error)
- func (p *Pipe[T]) PublishSyncAny(payload T) (*PipeSyncResult, error)
- func (p *Pipe[T]) PublishWithContext(ctx context.Context, payload T) error
- func (p *Pipe[T]) Subscribe(handler Handler[T]) error
- func (p *Pipe[T]) SubscribeWithPriority(handler Handler[T], priority int) error
- func (p *Pipe[T]) SubscribeWithResponse(handler PipeResponseHandler[T]) (PipeResponseCancel, error)
- func (p *Pipe[T]) SubscribeWithResponseAndPriority(handler PipeResponseHandler[T], priority int) (PipeResponseCancel, error)
- func (p *Pipe[T]) Unsubscribe(handler Handler[T]) error
- type PipeHandlerResult
- type PipeResponseCancel
- type PipeResponseHandler
- type PipeSyncResult
- type PublishMetadata
- type QueueStats
- type ResponseHandler
- type ResponseHandlerWithContext
- type SmartFilter
- type Subscription
- type SyncResult
- func PublishSyncAll(topic string, payload any) (*SyncResult, error)
- func PublishSyncAllWithContext(ctx context.Context, topic string, payload any) (*SyncResult, error)
- func PublishSyncAny(topic string, payload any) (*SyncResult, error)
- func PublishSyncAnyWithContext(ctx context.Context, topic string, payload any) (*SyncResult, error)
- type TopicGroup
- func (g *TopicGroup) Publish(topic string, payload any) error
- func (g *TopicGroup) PublishSync(topic string, payload any) error
- func (g *TopicGroup) PublishSyncAll(topic string, payload any) (*SyncResult, error)
- func (g *TopicGroup) PublishSyncAllWithContext(ctx context.Context, topic string, payload any) (*SyncResult, error)
- func (g *TopicGroup) PublishSyncAny(topic string, payload any) (*SyncResult, error)
- func (g *TopicGroup) PublishSyncAnyWithContext(ctx context.Context, topic string, payload any) (*SyncResult, error)
- func (g *TopicGroup) PublishSyncWithContext(ctx context.Context, topic string, payload any) error
- func (g *TopicGroup) PublishWithContext(ctx context.Context, topic string, payload any) error
- func (g *TopicGroup) Subscribe(topic string, handler any) error
- func (g *TopicGroup) SubscribeWithPriority(topic string, handler any, priority int) error
- func (g *TopicGroup) SubscribeWithResponse(topic string, handler ResponseHandler) error
- func (g *TopicGroup) SubscribeWithResponseContext(topic string, handler ResponseHandlerWithContext) error
- func (g *TopicGroup) Unsubscribe(topic string, handler any) error
- func (g *TopicGroup) UnsubscribeAll(topic string) error
- type TopicStat
Constants ¶
const ( // DefaultTimeout 默认超时时间 DefaultTimeout = 5 * time.Second )
Variables ¶
var ( // ErrHandlerIsNotFunc 当处理器不是一个函数时返回 ErrHandlerIsNotFunc = errors.New("handler is not a function") // ErrHandlerParamNum 当处理器的参数数量不等于两个时返回 ErrHandlerParamNum = errors.New("handler must have exactly two parameters") // ErrHandlerFirstParam 当处理器的第一个参数不是字符串时返回 ErrHandlerFirstParam = errors.New("handler's first parameter must be string") // ErrHandlerReturnNum 当普通处理器返回值数量非法时返回 ErrHandlerReturnNum = errors.New("handler must not return values") // ErrResponseReturnNum 当响应式处理器返回值数量非法时返回 ErrResponseReturnNum = errors.New("response handler must return (any, error)") // ErrResponseReturnType 当响应式处理器第二个返回值不是 error 时返回 ErrResponseReturnType = errors.New("response handler second return value must be error") // ErrNoSubscriber 当某个主题没有订阅者时返回 ErrNoSubscriber = errors.New("no subscriber found for topic") // ErrChannelClosed 当尝试使用已关闭的通道时返回 ErrChannelClosed = errors.New("channel is closed") // ErrPublishTimeout 当发布操作超时时返回 ErrPublishTimeout = errors.New("publish operation timed out") // ErrInvalidTopic 当主题格式无效时返回 ErrInvalidTopic = errors.New("invalid topic format") // ErrEventBusClosed 当事件总线已关闭时返回 ErrEventBusClosed = errors.New("event bus is closed") )
var DefaultBus = New(defaultBufferSize)
DefaultBus 全局默认事件总线实例 建议:对于生产环境,推荐显式创建和管理 EventBus 实例以获得更好的控制和测试性
var TopicSeparators = []string{".", "/"}
TopicSeparators 定义允许的主题分隔符
Functions ¶
func PublishAsyncWithContext ¶ added in v0.2.0
PublishAsyncWithContext 带上下文异步发布,使用全局单例
func PublishSyncWithContext ¶ added in v0.2.0
PublishSyncWithContext 带上下文同步发布,使用全局单例
func PublishWithContext ¶ added in v0.1.6
PublishWithContext 带上下文发布消息,使用全局单例
func SubscribeWithPriority ¶ added in v0.1.6
SubscribeWithPriority 带优先级订阅,使用全局单例
func SubscribeWithResponse ¶ added in v0.2.0
func SubscribeWithResponse(topic string, handler ResponseHandler) error
SubscribeWithResponse 响应式订阅,使用全局单例
func SubscribeWithResponseContext ¶ added in v0.2.0
func SubscribeWithResponseContext(topic string, handler ResponseHandlerWithContext) error
SubscribeWithResponseContext 带上下文的响应式订阅,使用全局单例
func UnsubscribeAll ¶ added in v0.2.0
UnsubscribeAll 取消主题的所有订阅,使用全局单例
Types ¶
type CompleteMetadata ¶ added in v0.1.3
type CompleteMetadata struct {
StartTime time.Time // 开始处理时间
EndTime time.Time // 结束处理时间
ProcessingTime time.Duration // 处理耗时
HandlerCount int // 处理器数量
Success bool // 是否成功
}
CompleteMetadata 事件处理完成的元数据
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus 优化后的事件总线
func (*EventBus) AddFilter ¶ added in v0.1.1
func (e *EventBus) AddFilter(filter EventFilter)
AddFilter 添加过滤器
func (*EventBus) NewGroup ¶ added in v0.1.2
func (e *EventBus) NewGroup(prefix string) *TopicGroup
NewGroup 创建一个新的主题组
func (*EventBus) PublishAsyncWithContext ¶ added in v0.2.0
PublishAsyncWithContext 显式的带上下文异步发布
func (*EventBus) PublishSync ¶
PublishSync 同步发布消息
func (*EventBus) PublishSyncAll ¶ added in v0.2.0
func (e *EventBus) PublishSyncAll(topic string, payload any) (*SyncResult, error)
PublishSyncAll 所有处理器成功才算成功
func (*EventBus) PublishSyncAllWithContext ¶ added in v0.2.0
func (e *EventBus) PublishSyncAllWithContext(ctx context.Context, topic string, payload any) (*SyncResult, error)
PublishSyncAllWithContext 所有处理器成功才算成功,且透传调用方上下文
func (*EventBus) PublishSyncAny ¶ added in v0.2.0
func (e *EventBus) PublishSyncAny(topic string, payload any) (*SyncResult, error)
PublishSyncAny 任一处理器成功即算成功
func (*EventBus) PublishSyncAnyWithContext ¶ added in v0.2.0
func (e *EventBus) PublishSyncAnyWithContext(ctx context.Context, topic string, payload any) (*SyncResult, error)
PublishSyncAnyWithContext 任一处理器成功即算成功,且透传调用方上下文
func (*EventBus) PublishSyncWithContext ¶ added in v0.2.0
PublishSyncWithContext 带上下文的同步发布
func (*EventBus) PublishWithContext ¶ added in v0.1.6
PublishWithContext 带上下文的异步发布
func (*EventBus) SetTracer ¶ added in v0.1.3
func (e *EventBus) SetTracer(tracer EventTracer)
SetTracer 设置追踪器
func (*EventBus) SubscribeWithPriority ¶ added in v0.1.1
SubscribeWithPriority 带优先级订阅
func (*EventBus) SubscribeWithResponse ¶ added in v0.2.0
func (e *EventBus) SubscribeWithResponse(topic string, handler ResponseHandler) error
SubscribeWithResponse 订阅支持返回值的处理器(默认优先级0)
func (*EventBus) SubscribeWithResponseAndPriority ¶ added in v0.2.0
func (e *EventBus) SubscribeWithResponseAndPriority(topic string, handler ResponseHandler, priority int) error
SubscribeWithResponseAndPriority 带优先级的响应订阅
func (*EventBus) SubscribeWithResponseContext ¶ added in v0.2.0
func (e *EventBus) SubscribeWithResponseContext(topic string, handler ResponseHandlerWithContext) error
SubscribeWithResponseContext 订阅支持context的响应处理器(默认优先级0)
func (*EventBus) SubscribeWithResponseContextAndPriority ¶ added in v0.2.0
func (e *EventBus) SubscribeWithResponseContextAndPriority(topic string, handler ResponseHandlerWithContext, priority int) error
SubscribeWithResponseContextAndPriority 带优先级的context响应订阅
func (*EventBus) Unsubscribe ¶
Unsubscribe 取消订阅
func (*EventBus) UnsubscribeAll ¶ added in v0.1.4
UnsubscribeAll 取消主题的所有订阅
type EventFilter ¶ added in v0.1.1
type EventFilter interface {
// Filter 返回 true 表示事件可以继续传递,false 表示拦截该事件
Filter(topic string, payload any) bool
}
EventFilter 定义事件过滤器接口
type EventTracer ¶ added in v0.1.3
type EventTracer interface {
// OnPublish 在消息发布时调用
OnPublish(topic string, payload any, metadata PublishMetadata)
// OnSubscribe 在订阅时调用
OnSubscribe(topic string, handler any)
// OnUnsubscribe 在取消订阅时调用
OnUnsubscribe(topic string, handler any)
// OnError 在发生错误时调用
OnError(topic string, err error)
// OnComplete 在消息处理完成时调用
OnComplete(topic string, metadata CompleteMetadata)
// OnQueueFull 在队列满时调用
OnQueueFull(topic string, size int)
// OnSlowConsumer 在慢消费者时调用
OnSlowConsumer(topic string, latency time.Duration)
}
EventTracer 定义事件追踪接口
type FilterFunc ¶ added in v0.2.0
FilterFunc 函数式过滤器适配器,便于直接使用函数
type HandlerInfo ¶ added in v0.1.6
type HandlerInfo struct {
Priority int
ID uintptr // 用于唯一标识处理器
IsResponse bool // 标识是否为响应处理器
// contains filtered or unexported fields
}
HandlerInfo 处理器信息,支持优先级
type HandlerResult ¶ added in v0.2.0
type HandlerResult struct {
HandlerID string `json:"handler_id"` // 处理器唯一标识
Success bool `json:"success"` // 执行是否成功
Result any `json:"result"` // 返回值
Error error `json:"error"` // 错误信息
Duration time.Duration `json:"duration"` // 执行耗时
}
HandlerResult 处理器执行结果
type HandlerWithPriority ¶ added in v0.1.6
type HandlerWithPriority[T any] struct { Handler Handler[T] ResponseHandler PipeResponseHandler[T] Priority int ID uintptr }
HandlerWithPriority 带优先级的处理器
type IMiddleware ¶ added in v0.2.0
type IMiddleware interface {
// Before 在事件处理前执行,可以修改payload
Before(topic string, payload any) any
// After 在事件处理后执行
After(topic string, payload any)
}
IMiddleware 定义事件处理中间件接口
type LatencyStats ¶ added in v0.1.3
type LatencyStats struct {
// contains filtered or unexported fields
}
LatencyStats 延迟统计
type MetricsTracer ¶ added in v0.1.3
type MetricsTracer struct {
// contains filtered or unexported fields
}
MetricsTracer 实现基础的指标收集
func NewMetricsTracer ¶ added in v0.1.3
func NewMetricsTracer() *MetricsTracer
NewMetricsTracer 创建新的指标追踪器
func (*MetricsTracer) GetLatencyMetrics ¶ added in v0.1.3
func (m *MetricsTracer) GetLatencyMetrics() map[string]map[string]interface{}
GetLatencyMetrics 获取延迟指标
func (*MetricsTracer) GetMetrics ¶ added in v0.1.3
func (m *MetricsTracer) GetMetrics() map[string]interface{}
GetMetrics 获取当前指标
func (*MetricsTracer) GetQueueMetrics ¶ added in v0.1.3
func (m *MetricsTracer) GetQueueMetrics() map[string]map[string]int64
GetQueueMetrics 获取队列指标
func (*MetricsTracer) OnComplete ¶ added in v0.1.3
func (m *MetricsTracer) OnComplete(topic string, metadata CompleteMetadata)
OnComplete 在消息处理完成时调用
func (*MetricsTracer) OnError ¶ added in v0.1.3
func (m *MetricsTracer) OnError(topic string, err error)
OnError 在发生错误时调用
func (*MetricsTracer) OnPublish ¶ added in v0.1.3
func (m *MetricsTracer) OnPublish(topic string, payload any, metadata PublishMetadata)
OnPublish 在消息发布时调用
func (*MetricsTracer) OnQueueFull ¶ added in v0.1.3
func (m *MetricsTracer) OnQueueFull(topic string, size int)
OnQueueFull 在队列满时调用
func (*MetricsTracer) OnSlowConsumer ¶ added in v0.1.3
func (m *MetricsTracer) OnSlowConsumer(topic string, latency time.Duration)
OnSlowConsumer 在慢消费者时调用
func (*MetricsTracer) OnSubscribe ¶ added in v0.1.3
func (m *MetricsTracer) OnSubscribe(topic string, handler any)
OnSubscribe 在订阅时调用
func (*MetricsTracer) OnUnsubscribe ¶ added in v0.1.3
func (m *MetricsTracer) OnUnsubscribe(topic string, handler any)
OnUnsubscribe 在取消订阅时调用
type Middleware ¶ added in v0.1.1
type Middleware struct {
// contains filtered or unexported fields
}
Middleware 提供基础性能统计与可选的负载转换能力
func (*Middleware) After ¶ added in v0.1.1
func (m *Middleware) After(topic string, payload any)
After 记录耗时
func (*Middleware) Before ¶ added in v0.1.1
func (m *Middleware) Before(topic string, payload any) any
Before 记录开始时间,可选地转换负载
func (*Middleware) GetStats ¶ added in v0.2.0
func (m *Middleware) GetStats() map[string]TopicStat
GetStats 获取性能统计快照
func (*Middleware) SetTransformer ¶ added in v0.2.0
func (m *Middleware) SetTransformer(fn func(topic string, payload any) any)
SetTransformer 设置负载转换函数(可选)
type Pipe ¶
Pipe 优化后的泛型管道
func NewBufferedPipe ¶
NewBufferedPipe 创建一个带指定缓冲区大小的管道
func NewBufferedPipeWithTimeout ¶ added in v0.2.0
NewBufferedPipeWithTimeout 创建一个带缓冲的管道并自定义超时
func NewPipeWithTimeout ¶ added in v0.2.0
NewPipeWithTimeout 创建一个无缓冲管道并自定义超时
func (*Pipe[T]) PublishSync ¶
PublishSync 同步发送消息给所有订阅者
func (*Pipe[T]) PublishSyncAll ¶ added in v0.2.0
func (p *Pipe[T]) PublishSyncAll(payload T) (*PipeSyncResult, error)
PublishSyncAll 响应式同步发布,要求所有处理器都成功
func (*Pipe[T]) PublishSyncAny ¶ added in v0.2.0
func (p *Pipe[T]) PublishSyncAny(payload T) (*PipeSyncResult, error)
PublishSyncAny 响应式同步发布,只要有一个处理器成功即可
func (*Pipe[T]) PublishWithContext ¶ added in v0.1.6
PublishWithContext 带上下文的异步发送
func (*Pipe[T]) SubscribeWithPriority ¶ added in v0.1.6
SubscribeWithPriority 向管道添加一个带优先级的处理器
func (*Pipe[T]) SubscribeWithResponse ¶ added in v0.2.0
func (p *Pipe[T]) SubscribeWithResponse(handler PipeResponseHandler[T]) (PipeResponseCancel, error)
SubscribeWithResponse 向管道添加一个响应式处理器
func (*Pipe[T]) SubscribeWithResponseAndPriority ¶ added in v0.2.0
func (p *Pipe[T]) SubscribeWithResponseAndPriority(handler PipeResponseHandler[T], priority int) (PipeResponseCancel, error)
SubscribeWithResponseAndPriority 向管道添加一个带优先级的响应式处理器
func (*Pipe[T]) Unsubscribe ¶
Unsubscribe 从管道中移除一个处理器
type PipeHandlerResult ¶ added in v0.2.0
type PipeHandlerResult struct {
HandlerID string `json:"handler_id"` // 处理器唯一标识
Success bool `json:"success"` // 执行是否成功
Result any `json:"result"` // 返回值
Error error `json:"error"` // 错误信息
Duration time.Duration `json:"duration"` // 执行耗时
}
PipeHandlerResult 处理器执行结果
type PipeResponseCancel ¶ added in v0.2.0
type PipeResponseCancel func()
PipeResponseCancel 用于撤销响应式处理器订阅
type PipeResponseHandler ¶ added in v0.2.0
PipeResponseHandler 定义了一个函数类型,用于处理 T 类型的消息并返回响应
type PipeSyncResult ¶ added in v0.2.0
type PipeSyncResult struct {
Success bool `json:"success"` // 整体是否成功
HandlerCount int `json:"handler_count"` // 处理器总数
SuccessCount int `json:"success_count"` // 成功数量
FailureCount int `json:"failure_count"` // 失败数量
Results []PipeHandlerResult `json:"results"` // 详细结果
TotalTime time.Duration `json:"total_time"` // 总耗时
}
PipeSyncResult 同步发布结果
type PublishMetadata ¶ added in v0.1.3
type PublishMetadata struct {
Timestamp time.Time // 发布时间
Async bool // 是否异步发布
QueueSize int // 队列大小
PublisherID string // 发布者ID(可选)
}
PublishMetadata 发布事件的元数据
type QueueStats ¶ added in v0.1.3
type QueueStats struct {
// contains filtered or unexported fields
}
QueueStats 队列统计
type ResponseHandler ¶ added in v0.2.0
ResponseHandler 响应式处理器(不带context参数) 签名: func(topic string, payload any) (any, error)
type ResponseHandlerWithContext ¶ added in v0.2.0
ResponseHandlerWithContext 响应式处理器(带context参数) 签名: func(ctx context.Context, topic string, payload any) (any, error)
type SmartFilter ¶ added in v0.2.0
type SmartFilter struct {
// contains filtered or unexported fields
}
SmartFilter 提供基础限流与主题阻断能力
func NewSmartFilter ¶ added in v0.2.0
func NewSmartFilter() *SmartFilter
NewSmartFilter 创建智能过滤器,默认限流窗口为 1 分钟
func (*SmartFilter) BlockTopic ¶ added in v0.2.0
func (f *SmartFilter) BlockTopic(topic string)
BlockTopic 阻断指定主题(含其子层级)
func (*SmartFilter) Filter ¶ added in v0.2.0
func (f *SmartFilter) Filter(topic string, payload any) bool
Filter 实现 EventFilter 接口
func (*SmartFilter) SetLimit ¶ added in v0.2.0
func (f *SmartFilter) SetLimit(topic string, limit int)
SetLimit 设置指定主题的限流次数(窗口内最大可通过次数)
func (*SmartFilter) SetWindow ¶ added in v0.2.0
func (f *SmartFilter) SetWindow(window time.Duration)
SetWindow 调整限流窗口大小
func (*SmartFilter) UnblockTopic ¶ added in v0.2.0
func (f *SmartFilter) UnblockTopic(topic string)
UnblockTopic 解除主题阻断
type Subscription ¶ added in v0.1.1
Subscription 定义带优先级的订阅信息
type SyncResult ¶ added in v0.2.0
type SyncResult struct {
Success bool `json:"success"` // 整体是否成功
HandlerCount int `json:"handler_count"` // 处理器总数
SuccessCount int `json:"success_count"` // 成功数量
FailureCount int `json:"failure_count"` // 失败数量
Results []HandlerResult `json:"results"` // 详细结果
TotalTime time.Duration `json:"total_time"` // 总耗时
}
SyncResult 同步发布结果
func PublishSyncAll ¶ added in v0.2.0
func PublishSyncAll(topic string, payload any) (*SyncResult, error)
PublishSyncAll 响应式同步发布(全部成功),使用全局单例
func PublishSyncAllWithContext ¶ added in v0.2.0
PublishSyncAllWithContext 响应式同步发布(全部成功,透传上下文)
func PublishSyncAny ¶ added in v0.2.0
func PublishSyncAny(topic string, payload any) (*SyncResult, error)
PublishSyncAny 响应式同步发布(任一成功),使用全局单例
func PublishSyncAnyWithContext ¶ added in v0.2.0
PublishSyncAnyWithContext 响应式同步发布(任一成功,透传上下文)
type TopicGroup ¶ added in v0.1.2
type TopicGroup struct {
// contains filtered or unexported fields
}
TopicGroup 表示一个主题组
func (*TopicGroup) Publish ¶ added in v0.1.2
func (g *TopicGroup) Publish(topic string, payload any) error
Publish 在组内发布消息
func (*TopicGroup) PublishSync ¶ added in v0.2.0
func (g *TopicGroup) PublishSync(topic string, payload any) error
PublishSync 在组内同步发布消息
func (*TopicGroup) PublishSyncAll ¶ added in v0.2.0
func (g *TopicGroup) PublishSyncAll(topic string, payload any) (*SyncResult, error)
PublishSyncAll 组内响应式同步发布(全部成功)
func (*TopicGroup) PublishSyncAllWithContext ¶ added in v0.2.0
func (g *TopicGroup) PublishSyncAllWithContext(ctx context.Context, topic string, payload any) (*SyncResult, error)
PublishSyncAllWithContext 组内响应式同步发布(全部成功,透传上下文)
func (*TopicGroup) PublishSyncAny ¶ added in v0.2.0
func (g *TopicGroup) PublishSyncAny(topic string, payload any) (*SyncResult, error)
PublishSyncAny 组内响应式同步发布(任一成功)
func (*TopicGroup) PublishSyncAnyWithContext ¶ added in v0.2.0
func (g *TopicGroup) PublishSyncAnyWithContext(ctx context.Context, topic string, payload any) (*SyncResult, error)
PublishSyncAnyWithContext 组内响应式同步发布(任一成功,透传上下文)
func (*TopicGroup) PublishSyncWithContext ¶ added in v0.2.0
PublishSyncWithContext 在组内带上下文同步发布
func (*TopicGroup) PublishWithContext ¶ added in v0.2.0
PublishWithContext 在组内带上下文异步发布
func (*TopicGroup) Subscribe ¶ added in v0.1.2
func (g *TopicGroup) Subscribe(topic string, handler any) error
Subscribe 订阅组内的主题
func (*TopicGroup) SubscribeWithPriority ¶ added in v0.2.0
func (g *TopicGroup) SubscribeWithPriority(topic string, handler any, priority int) error
SubscribeWithPriority 组内带优先级订阅
func (*TopicGroup) SubscribeWithResponse ¶ added in v0.2.0
func (g *TopicGroup) SubscribeWithResponse(topic string, handler ResponseHandler) error
SubscribeWithResponse 组内响应式订阅
func (*TopicGroup) SubscribeWithResponseContext ¶ added in v0.2.0
func (g *TopicGroup) SubscribeWithResponseContext(topic string, handler ResponseHandlerWithContext) error
SubscribeWithResponseContext 组内带上下文的响应式订阅
func (*TopicGroup) Unsubscribe ¶ added in v0.1.2
func (g *TopicGroup) Unsubscribe(topic string, handler any) error
Unsubscribe 取消订阅组内的主题
func (*TopicGroup) UnsubscribeAll ¶ added in v0.2.0
func (g *TopicGroup) UnsubscribeAll(topic string) error
UnsubscribeAll 取消组内主题的所有订阅