eventbus

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 16, 2025 License: MIT Imports: 11 Imported by: 0

README

EventBus 事件总线库

Go Reference Go Report Card Coverage Status MIT License

EventBus 是一个高性能的 Go 事件总线库,基于优化的写时复制(Copy-on-Write)机制实现高并发性能。提供事件发布/订阅、事件追踪、过滤器、中间件等企业级功能。

✨ 功能特性

  • 🚀 高性能异步/同步事件发布 - 优化的COW机制,读操作零锁竞争
  • 📊 完整事件追踪监控 - 生命周期追踪、性能指标、错误监控
  • 🔍 智能事件过滤器 - 频率限制、内容过滤、主题阻断
  • 🔧 灵活处理中间件 - 性能监控、日志记录、数据转换
  • ⏱️ 超时控制和上下文支持 - Context传播、超时处理、取消机制
  • 🔒 线程安全设计 - 原子操作、读写锁、竞态检测通过
  • 🎯 类型安全泛型管道 - 强类型消息传递、编译时类型检查
  • 🌟 完整MQTT通配符支持 - 支持 +*# 三种通配符和混合分隔符
  • 🎯 响应式同步发布 - PublishSyncAll/PublishSyncAny 及 WithContext 版本,支持处理器返回值与上下文透传
  • 📁 分组和命名空间支持 - 层级化主题管理、权限控制
  • 优先级订阅机制 - 处理器优先级排序、有序执行
  • 📈 实时性能统计 - 吞吐量、延迟、队列状态监控
  • 🏥 健康检查和故障恢复 - 系统状态监控、自动故障处理

📦 安装

go get github.com/darkit/eventbus

系统要求: Go 1.23+

🚀 快速开始

基础使用
package main

import (
    "context"
    "fmt"
    "time"
    
    "github.com/darkit/eventbus"
)

func main() {
    // 创建事件总线
    bus := eventbus.New(1024) // 缓冲大小1024
    defer bus.Close()

    // 优先级订阅(数字越大优先级越高)
    bus.SubscribeWithPriority("user.created", func(topic string, payload any) {
        fmt.Printf("🔴 高优先级处理: %v\n", payload)
    }, 10)

    bus.Subscribe("user.created", func(topic string, payload any) {
        fmt.Printf("🔵 普通处理: %v\n", payload)
    })

    // 异步发布
    bus.Publish("user.created", map[string]string{"name": "John"})

    // 同步发布
    bus.PublishSync("user.created", map[string]string{"name": "Jane"})

    // 带上下文的发布
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    bus.PublishWithContext(ctx, "user.created", map[string]string{"name": "Alice"})
}
通配符订阅
// + 中间单层通配符:匹配任意一个层级(MQTT标准)
bus.Subscribe("sensor/+/temperature", func(topic string, payload any) {
    fmt.Printf("温度传感器 %s: %v\n", topic, payload)
})

// * 末尾单层通配符:匹配最后一个层级
bus.Subscribe("alert/*", func(topic string, payload any) {
    fmt.Printf("告警事件 %s: %v\n", topic, payload)
})

// # 多层通配符:匹配零个或多个层级(MQTT标准)
bus.Subscribe("system/#", func(topic string, payload any) {
    fmt.Printf("系统事件 %s: %v\n", topic, payload)
})

// 混合分隔符支持:. 和 / 可以混合使用
bus.Subscribe("notifications/email/*", func(topic string, payload any) {
    fmt.Printf("邮件通知: %v\n", payload)
})

// 发布消息(保持原始主题格式)
bus.Publish("sensor/room1/temperature", "25°C")     // 匹配第一个订阅
bus.Publish("alert/fire", "房间1发生火灾")             // 匹配第二个订阅  
bus.Publish("system/cpu/high", "CPU使用率过高")        // 匹配第三个订阅
bus.Publish("notifications/email/welcome", "欢迎邮件") // 匹配第四个订阅
响应式同步发布

类型安全: EventBus 引入了具体的函数类型定义,提供编译时类型检查

  • ResponseHandler: func(topic string, payload any) (any, error)
  • ResponseHandlerWithContext: func(ctx context.Context, topic string, payload any) (any, error)
// 订阅支持返回值的处理器(不带context)
bus.SubscribeWithResponse("order/process", func(topic string, payload any) (any, error) {
    order := payload.(map[string]any)
    // 处理订单逻辑
    if order["amount"].(float64) > 1000 {
        return nil, errors.New("金额超限")
    }
    return map[string]any{"status": "success", "order_id": order["id"]}, nil
})

// 订阅支持context的响应式处理器
bus.SubscribeWithResponseContext("order/validate", func(ctx context.Context, topic string, payload any) (any, error) {
    // 可以使用context进行超时控制或取消
    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    default:
        // 执行验证逻辑
        return map[string]any{"valid": true}, nil
    }
})

bus.SubscribeWithResponse("order/process", func(topic string, payload any) (any, error) {
    // 库存检查
    return map[string]any{"inventory": "sufficient"}, nil
})

// PublishSyncAll: 所有处理器必须成功才算成功
result, err := bus.PublishSyncAll("order/process", map[string]any{
    "id": "ORDER-001", 
    "amount": 299.99,
})

if err != nil {
    log.Printf("发布超时: %v", err)
} else if result.Success {
    fmt.Printf("✅ 订单处理成功! 耗时: %v\n", result.TotalTime)
    fmt.Printf("📊 统计: %d/%d 处理器成功\n", result.SuccessCount, result.HandlerCount)
    
    // 查看处理器返回值
    for _, handlerResult := range result.Results {
        if handlerResult.Success {
            fmt.Printf("处理器响应: %v (耗时: %v)\n", 
                handlerResult.Result, handlerResult.Duration)
        }
    }
} else {
    fmt.Printf("❌ 订单处理失败: %d/%d 处理器成功\n", 
        result.SuccessCount, result.HandlerCount)
    
    // 查看失败原因
    for _, handlerResult := range result.Results {
        if !handlerResult.Success {
            fmt.Printf("处理器失败: %v\n", handlerResult.Error)
        }
    }
}

// PublishSyncAny: 任一处理器成功即算成功
result, err = bus.PublishSyncAny("notification/send", map[string]any{
    "recipient": "[email protected]",
    "message": "订单确认",
})

if result.Success {
    fmt.Printf("✅ 通知发送成功! 耗时: %v\n", result.TotalTime)
} else {
    fmt.Printf("❌ 所有通知渠道都失败了\n")
}

🏗️ 核心组件

EventBus - 事件总线
// 创建选项
bus := eventbus.New()                    // 无缓冲,实时性最高
bus := eventbus.New(1024)               // 指定缓冲,固定大小
bus := eventbus.New(-1)                 // 智能缓冲(CPU核心数*64,推荐)

// 订阅管理
bus.Subscribe("topic", handler)                           // 普通订阅
bus.SubscribeWithPriority("topic", handler, priority)     // 优先级订阅
bus.SubscribeWithResponse("topic", responseHandler)       // 响应式订阅(不带context)
bus.SubscribeWithResponseContext("topic", handler)        // 响应式订阅(带context)
bus.Unsubscribe("topic", handler)                        // 取消订阅
bus.UnsubscribeAll("topic")                              // 取消所有订阅

// 发布选项
bus.Publish("topic", payload)                            // 异步发布
bus.PublishSync("topic", payload)                        // 同步发布
bus.PublishWithContext(ctx, "topic", payload)            // 带上下文异步发布
bus.PublishSyncWithContext(ctx, "topic", payload)        // 带上下文同步发布
bus.PublishAsyncWithContext(ctx, "topic", payload)       // 显式异步(等同 PublishWithContext)
// 说明:若异步发布时缓冲已满,Publish 会阻塞等待直至 DefaultTimeout(默认 5 秒)
//       或 ctx 截止时间到来,也可响应取消信号;成功写入后立即返回。

// 响应式发布
result, err := bus.PublishSyncAll("topic", payload)                 // 所有处理器成功才算成功(默认 5 秒超时)
result, err := bus.PublishSyncAny("topic", payload)                 // 任一处理器成功即算成功(默认 5 秒超时)
result, err := bus.PublishSyncAllWithContext(ctx, "topic", payload) // 继承调用方 ctx,必要时自动补充默认超时
result, err := bus.PublishSyncAnyWithContext(ctx, "topic", payload) // 任一处理器成功即返回并取消其他处理器

// 提示:当 ctx 未设置截止时间时,WithContext 变体会自动叠加默认超时;
// 如果需要自定义时限,可在调用前通过 context.WithTimeout/WithDeadline 设置专属超时。

// 系统管理
stats := bus.GetStats()                                   // 获取统计信息
err := bus.HealthCheck()                                  // 健康检查
bus.Close()                                              // 关闭总线
泛型管道 (Pipe) - 类型安全
// 创建类型安全的管道
intPipe := eventbus.NewPipe[int]()                         // 无缓冲
msgPipe := eventbus.NewBufferedPipe[Message](100)          // 带缓冲
customPipe := eventbus.NewBufferedPipeWithTimeout[int](10, 3*time.Second) // 自定义超时

// 普通订阅处理
intPipe.SubscribeWithPriority(func(val int) {
    fmt.Printf("高优先级处理: %d\n", val)
}, 10)

intPipe.Subscribe(func(val int) {
    fmt.Printf("普通处理: %d\n", val)
})

// 响应式订阅
cancelResponse, err := intPipe.SubscribeWithResponse(func(val int) (any, error) {
    if val < 0 {
        return nil, errors.New("负数不被支持")
    }
    return val * 2, nil // 返回处理结果
})

// 发布消息
intPipe.Publish(42)                             // 异步
intPipe.PublishSync(42)                         // 同步
intPipe.PublishWithContext(ctx, 42)             // 带上下文

// 响应式发布
result, err := intPipe.PublishSyncAll(42)       // 所有处理器成功才算成功
if err != nil {
    log.Printf("发布超时: %v", err)
} else if result.Success {
    fmt.Printf("✅ 处理成功! 耗时: %v\n", result.TotalTime)
    for _, handlerResult := range result.Results {
        if handlerResult.Success {
            fmt.Printf("处理器返回: %v\n", handlerResult.Result)
        }
    }
}

result, err = intPipe.PublishSyncAny(42)        // 任一处理器成功即算成功

// 注意:PublishSyncAll/PublishSyncAny 使用默认5秒超时
// 如需自定义超时,请在处理器内部使用 context

// 取消订阅
cancelResponse()                                // 取消响应式订阅

// 管理
stats := intPipe.GetStats()                     // 统计信息
intPipe.Close()                                // 关闭管道
全局单例 - 便捷访问
import "github.com/darkit/eventbus"

// 直接使用全局实例
eventbus.Subscribe("global.event", handler)
eventbus.Publish("global.event", payload)
eventbus.PublishSync("global.event", payload)
eventbus.PublishWithContext(ctx, "global.event", payload)
eventbus.PublishSyncWithContext(ctx, "global.event", payload)

// 系统管理
eventbus.HealthCheck()              // 健康检查
eventbus.Close()                    // 关闭全局实例

🔧 高级功能

事件追踪器 - 监控和调试
type EventTracer interface {
    OnPublish(topic string, payload any, metadata PublishMetadata)
    OnSubscribe(topic string, handler any)
    OnUnsubscribe(topic string, handler any)
    OnError(topic string, err error)
    OnComplete(topic string, metadata CompleteMetadata)
    OnQueueFull(topic string, size int)
    OnSlowConsumer(topic string, latency time.Duration)
}

// 使用内置的指标追踪器
tracer := eventbus.NewMetricsTracer()
bus.SetTracer(tracer)

// 获取指标
metrics := tracer.GetMetrics()
fmt.Printf("发布次数: %d\n", metrics["message_count"])
fmt.Printf("错误次数: %d\n", metrics["error_count"])
智能过滤器 - 流量控制
filter := eventbus.NewSmartFilter()

filter.SetLimit("user.login", 100)        // 每分钟最多 100 次登录事件
filter.SetWindow(30 * time.Second)         // 自定义限流窗口(默认 1 分钟)
filter.BlockTopic("internal.test")        // 阻断测试主题及其子主题

bus.AddFilter(filter)                      // 注册智能过滤器

// 动态调整:
filter.UnblockTopic("internal.test")      // 解除阻断
filter.SetLimit("user.login", 0)          // 移除针对 user.login 的限流

// 自定义过滤器:直接使用 FilterFunc 包装函数
bus.AddFilter(eventbus.FilterFunc(func(topic string, payload any) bool {
    return !strings.Contains(topic, "spam")
}))
性能中间件 - 监控和增强
middleware := eventbus.NewMiddleware()

// 可选:对负载进行统一转换
middleware.SetTransformer(func(topic string, payload any) any {
    if msg, ok := payload.(string); ok {
        return strings.TrimSpace(msg)
    }
    return payload
})

bus.Use(middleware)

// 获取性能统计
stats := middleware.GetStats()
for topic, stat := range stats {
    avg := stat.TotalTime / time.Duration(stat.Count)
    fmt.Printf("主题 %s: 执行 %d 次,平均耗时 %v\n", topic, stat.Count, avg)
}

middleware.Reset() // 清除历史统计

🎯 使用场景

响应式事务处理 (PublishSyncAll)
// 订单处理:所有步骤都必须成功
bus.SubscribeWithResponse("order/create", func(topic string, payload any) (any, error) {
    order := payload.(Order)
    
    // 库存检查
    if !checkInventory(order.ProductID, order.Quantity) {
        return nil, errors.New("库存不足")
    }
    return map[string]any{"step": "inventory", "status": "ok"}, nil
})

bus.SubscribeWithResponse("order/create", func(topic string, payload any) (any, error) {
    order := payload.(Order)
    
    // 支付处理
    transactionID, err := processPayment(order.Amount, order.PaymentMethod)
    if err != nil {
        return nil, fmt.Errorf("支付失败: %w", err)
    }
    return map[string]any{"step": "payment", "transaction_id": transactionID}, nil
})

bus.SubscribeWithResponse("order/create", func(topic string, payload any) (any, error) {
    order := payload.(Order)
    
    // 发货安排
    trackingID, err := arrangeShipping(order)
    if err != nil {
        return nil, fmt.Errorf("发货失败: %w", err)
    }
    return map[string]any{"step": "shipping", "tracking_id": trackingID}, nil
})

// 创建订单 - 必须所有步骤都成功
result, err := bus.PublishSyncAll("order/create", Order{
    ID:            "ORDER-123",
    ProductID:     "PROD-456", 
    Quantity:      2,
    Amount:        299.99,
    PaymentMethod: "credit_card",
})

if err != nil {
    log.Printf("订单处理超时: %v", err)
} else if result.Success {
    // 所有步骤都成功
    log.Printf("✅ 订单创建成功,耗时: %v", result.TotalTime)
    
    // 提取各步骤结果
    var transactionID, trackingID string
    for _, handlerResult := range result.Results {
        if handlerResult.Success {
            stepResult := handlerResult.Result.(map[string]any)
            switch stepResult["step"] {
            case "payment":
                transactionID = stepResult["transaction_id"].(string)
            case "shipping":
                trackingID = stepResult["tracking_id"].(string)
            }
        }
    }
    
    // 发送确认邮件
    sendOrderConfirmation(transactionID, trackingID)
} else {
    // 部分步骤失败,需要回滚
    log.Printf("❌ 订单创建失败: %d/%d 步骤成功", result.SuccessCount, result.HandlerCount)
    
    for _, handlerResult := range result.Results {
        if !handlerResult.Success {
            log.Printf("步骤失败: %v", handlerResult.Error)
        }
    }
    
    // 执行回滚逻辑
    rollbackOrder("ORDER-123")
}
高可用通知服务 (PublishSyncAny)
// 多渠道通知:任一渠道成功即可
bus.SubscribeWithResponse("notification/send", func(topic string, payload any) (any, error) {
    notification := payload.(Notification)
    
    // 邮件通知(主渠道)
    err := emailService.Send(notification.Recipient, notification.Subject, notification.Body)
    if err != nil {
        return nil, fmt.Errorf("邮件发送失败: %w", err)
    }
    return map[string]any{"channel": "email", "message_id": "EMAIL-123"}, nil
})

bus.SubscribeWithResponse("notification/send", func(topic string, payload any) (any, error) {
    notification := payload.(Notification)
    
    // 短信通知(备用渠道)
    messageID, err := smsService.Send(notification.Phone, notification.Body)
    if err != nil {
        return nil, fmt.Errorf("短信发送失败: %w", err)
    }
    return map[string]any{"channel": "sms", "message_id": messageID}, nil
})

bus.SubscribeWithResponse("notification/send", func(topic string, payload any) (any, error) {
    notification := payload.(Notification)
    
    // 推送通知(备用渠道)
    pushID, err := pushService.Send(notification.UserID, notification.Title, notification.Body)
    if err != nil {
        return nil, fmt.Errorf("推送发送失败: %w", err)
    }
    return map[string]any{"channel": "push", "message_id": pushID}, nil
})

// 发送通知 - 任一渠道成功即可
result, err := bus.PublishSyncAny("notification/send", Notification{
    Recipient: "[email protected]",
    Phone:     "+1234567890",
    UserID:    "USER-789",
    Subject:   "订单确认",
    Title:     "订单已创建",
    Body:      "您的订单 ORDER-123 已成功创建",
})

if err != nil {
    log.Printf("通知发送超时: %v", err)
} else if result.Success {
    log.Printf("✅ 通知发送成功,耗时: %v", result.TotalTime)
    
    // 记录成功的渠道
    for _, handlerResult := range result.Results {
        if handlerResult.Success {
            channelResult := handlerResult.Result.(map[string]any)
            log.Printf("通过 %s 渠道发送成功,消息ID: %s", 
                channelResult["channel"], channelResult["message_id"])
        }
    }
} else {
    log.Printf("❌ 所有通知渠道都失败了")
    
    // 记录所有失败原因
    for _, handlerResult := range result.Results {
        if !handlerResult.Success {
            log.Printf("渠道失败: %v", handlerResult.Error)
        }
    }
    
    // 触发告警
    alertService.SendAlert("通知系统全部失败", "所有通知渠道都无法使用")
}
微服务通信
// 服务间事件通信
bus.Subscribe("order.#", func(topic string, payload any) {
    switch topic {
    case "order.created":
        // 处理订单创建
    case "order.payment.completed":
        // 处理支付完成
    }
})

bus.Publish("order.created", OrderEvent{
    OrderID: "123",
    UserID:  "456",
    Amount:  99.99,
})
系统监控
// 系统指标收集
bus.Subscribe("metrics.#", func(topic string, payload any) {
    if metric, ok := payload.(MetricEvent); ok {
        // 发送到监控系统
        prometheus.RecordMetric(metric)
    }
})

// 发布CPU使用率
bus.Publish("metrics.cpu.usage", MetricEvent{
    Name:  "cpu_usage",
    Value: 85.5,
    Tags:  map[string]string{"host": "server1"},
})
用户行为追踪
// 用户行为分析
bus.Subscribe("user.action.*", func(topic string, payload any) {
    action := payload.(UserAction)
    analytics.Track(action.UserID, action.Event, action.Properties)
})

bus.Publish("user.action.click", UserAction{
    UserID: "user123",
    Event:  "button_click",
    Properties: map[string]any{
        "button_id": "checkout",
        "page":      "product_detail",
    },
})

⚡ 性能优化

基准测试结果
# 运行性能测试
go test -bench=. -benchmem

# 最新基准测试结果
BenchmarkEventBusPublishSync-383         2,125,314    595.0 ns/op   111 B/op    3 allocs/op
BenchmarkEventBusPublish-383               710,479   1827 ns/op     407 B/op    7 allocs/op
BenchmarkPipePublishSync-383            28,528,402     47.98 ns/op     8 B/op    1 allocs/op
BenchmarkCowMapLoad-383                 87,791,270     13.30 ns/op     0 B/op    0 allocs/op

# 响应式发布性能
BenchmarkPublishSyncAll-383                  5,067    198,873 ns/op   8,961 B/op  134 allocs/op
BenchmarkPublishSyncAny-383                 24,356     48,654 ns/op   5,811 B/op   87 allocs/op

# Pipe 响应式发布性能
BenchmarkPipePublishSyncAll-383            143,481      6,989 ns/op   1,291 B/op   34 allocs/op
BenchmarkPipeTraditionalPublishSync-383  9,023,946        157.4 ns/op     48 B/op    1 allocs/op
性能优化建议
  1. 缓冲区配置

    // 实时性优先:无缓冲(最低延迟)
    bus := eventbus.New()        // 或 eventbus.New(0)
    
    // 性能优先:智能自动缓冲(推荐)
    bus := eventbus.New(-1)      // CPU核心数 * 64,平衡性能和内存
    
    // 高吞吐量:大缓冲(内存充足场景)
    bus := eventbus.New(10000)   // 自定义大缓冲区
    
    // 轻量级:小缓冲(资源受限场景)
    bus := eventbus.New(256)     // 适度缓冲
    
  2. 发布方式选择

    // 高吞吐量:异步发布
    bus.Publish("topic", payload)
    
    // 即时反馈:同步发布
    bus.PublishSync("topic", payload)
    
    // 超时控制:带上下文发布
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    bus.PublishWithContext(ctx, "topic", payload)
    
  3. 订阅优化

    // 避免过多通配符订阅
    bus.Subscribe("user.login", handler)     // 好
    bus.Subscribe("user.*", handler)         // 可接受
    bus.Subscribe("#", handler)              // 避免,影响性能
    

🌐 MQTT 兼容性

EventBus 完全支持 MQTT 3.1.1 和 MQTT 5.0 规范的主题过滤器,提供三种通配符:

通配符类型
通配符 类型 说明 示例
+ 单层级 匹配任意一个层级,可在任意位置 sensor/+/temp 匹配 sensor/room1/temp
* 末尾单层级 匹配最后一个层级,只能在末尾 alert/* 匹配 alert/fire
# 多层级 匹配零个或多个层级,只能在末尾 system/# 匹配 system/cpu/high
分隔符支持
// 支持点分隔符 (.)
bus.Subscribe("sensor.room1.temperature", handler)

// 支持斜杠分隔符 (/)  
bus.Subscribe("sensor/room1/temperature", handler)

// 支持混合分隔符
bus.Subscribe("sensor/room1.temperature", handler)  // 内部统一处理

详细的MQTT兼容性说明请参考 MQTT_COMPATIBILITY.md

🛡️ 错误处理

标准错误类型
import "github.com/darkit/eventbus"

// 错误处理示例
if err := bus.Publish("topic", payload); err != nil {
    switch {
    case errors.Is(err, eventbus.ErrChannelClosed):
        log.Println("通道已关闭")
    case errors.Is(err, eventbus.ErrPublishTimeout):
        log.Println("发布超时")
    case errors.Is(err, eventbus.ErrEventBusClosed):
        log.Println("事件总线已关闭")
    case errors.Is(err, eventbus.ErrNoSubscriber):
        log.Println("没有订阅者")
    default:
        log.Printf("未知错误: %v", err)
    }
}

当发布返回 ErrPublishTimeout 时,通常表示目标主题的缓冲通道已满且在默认 5 秒内未能腾出空间;如果调用方提供了 ctx,则以 ctx 的截止时间或取消信号为准。可通过增大缓冲区、加快订阅者处理速度或调整 ctx 超时时间来缓解。

错误恢复
// 设置追踪器处理错误
tracer := &ErrorRecoveryTracer{}
bus.SetTracer(tracer)

type ErrorRecoveryTracer struct{}

func (t *ErrorRecoveryTracer) OnError(topic string, err error) {
    // 记录错误日志
    log.Printf("事件处理错误 [%s]: %v", topic, err)
    
    // 发送告警
    alerting.SendAlert("EventBus Error", err.Error())
    
    // 尝试重试或降级处理
    if isRetryableError(err) {
        // 重试逻辑
    }
}

🏗️ 架构设计

核心架构
┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   Publishers    │────│    EventBus      │────│   Subscribers   │
│                 │    │                  │    │                 │
│ • HTTP Handler  │    │ • Topic Router   │    │ • Log Handler   │
│ • Cron Jobs     │    │ • Filter Chain   │    │ • DB Handler    │
│ • External APIs │    │ • Middleware     │    │ • Email Service │
└─────────────────┘    │ • COW Map        │    └─────────────────┘
                       │ • Priority Queue │
                       │ • Health Monitor │
                       └──────────────────┘
                              │
                    ┌─────────┴─────────┐
                    │     Monitoring    │
                    │                   │
                    │ • Event Tracer    │
                    │ • Metrics         │
                    │ • Performance     │
                    └───────────────────┘
设计原则
  • 解耦: 发布者和订阅者完全解耦
  • 可扩展: 支持中间件和过滤器扩展
  • 高性能: 优化的COW机制,最小化锁竞争
  • 类型安全: 泛型管道提供编译时类型检查
  • 可观测: 完整的监控和追踪能力

📚 完整示例

查看示例代码获取完整的使用示例:

  • 完整功能示例 - 包括优先级订阅、通配符匹配、错误处理、泛型管道、全局单例、中间件和过滤器
  • 响应式发布示例 - PublishSyncAll/PublishSyncAny 完整演示
# 运行完整示例
go run examples/full/main.go

# 运行响应式示例
go run examples/response/main.go

# 运行示例测试
go test ./examples/full/ ./examples/response/

🔧 开发工具

Makefile 命令
# 格式化代码
make fmt

# 代码检查
make lint
make vet

# 运行测试
make test
make test-race

# 性能测试
make benchmark

# 构建项目
make build

# 查看帮助
make help
项目结构
eventbus/
├── README.md              # 项目说明
├── go.mod                 # Go模块定义
├── *.go                   # 核心源码文件
├── *_test.go              # 单元测试文件
├── examples/              # 使用示例
│   ├── full/             # 完整功能示例
│   │   ├── main.go       # 完整示例代码
│   │   └── main_test.go  # 示例测试
│   └── response/         # 响应式发布示例
│       └── main.go       # 响应式示例代码
├── docs/                  # 项目文档
│   ├── images/           # 架构图表
│   │   ├── 架构图.md     # 系统架构图
│   │   ├── 流程图.md     # 业务流程图
│   │   └── 时序图.md     # 时序交互图
│   ├── EventBus全面评估报告.md # 代码质量评估
│   ├── 评估报告-执行摘要.md # 项目执行摘要
│   ├── ToDos.md          # 任务清单
│   ├── MQTT_COMPATIBILITY.md  # MQTT兼容性文档
│   └── PublishSync响应式设计评估.md  # 响应式发布设计文档
└── Makefile              # 构建脚本

📊 文档资源

架构设计文档
开发文档

🤝 贡献指南

我们欢迎所有形式的贡献!

  1. Fork 本仓库
  2. 创建特性分支 (git checkout -b feature/amazing-feature)
  3. 提交更改 (git commit -m 'Add some amazing feature')
  4. 推送到分支 (git push origin feature/amazing-feature)
  5. 开启 Pull Request
贡献要求
  • 遵循 Go 代码规范
  • 添加适当的测试用例
  • 更新相关文档
  • 通过所有 CI 检查

📜 许可证

本项目采用 MIT License 许可证。

🙏 致谢

感谢所有贡献者对 EventBus 项目的支持和贡献!


EventBus - 让事件驱动架构更简单、更高效! 🚀

Documentation

Index

Constants

View Source
const (
	// DefaultTimeout 默认超时时间
	DefaultTimeout = 5 * time.Second
)

Variables

View Source
var (
	// ErrHandlerIsNotFunc 当处理器不是一个函数时返回
	ErrHandlerIsNotFunc = errors.New("handler is not a function")

	// ErrHandlerParamNum 当处理器的参数数量不等于两个时返回
	ErrHandlerParamNum = errors.New("handler must have exactly two parameters")

	// ErrHandlerFirstParam 当处理器的第一个参数不是字符串时返回
	ErrHandlerFirstParam = errors.New("handler's first parameter must be string")

	// ErrHandlerReturnNum 当普通处理器返回值数量非法时返回
	ErrHandlerReturnNum = errors.New("handler must not return values")

	// ErrResponseReturnNum 当响应式处理器返回值数量非法时返回
	ErrResponseReturnNum = errors.New("response handler must return (any, error)")

	// ErrResponseReturnType 当响应式处理器第二个返回值不是 error 时返回
	ErrResponseReturnType = errors.New("response handler second return value must be error")

	// ErrNoSubscriber 当某个主题没有订阅者时返回
	ErrNoSubscriber = errors.New("no subscriber found for topic")

	// ErrChannelClosed 当尝试使用已关闭的通道时返回
	ErrChannelClosed = errors.New("channel is closed")

	// ErrPublishTimeout 当发布操作超时时返回
	ErrPublishTimeout = errors.New("publish operation timed out")

	// ErrInvalidTopic 当主题格式无效时返回
	ErrInvalidTopic = errors.New("invalid topic format")

	// ErrEventBusClosed 当事件总线已关闭时返回
	ErrEventBusClosed = errors.New("event bus is closed")
)
View Source
var DefaultBus = New(defaultBufferSize)

DefaultBus 全局默认事件总线实例 建议:对于生产环境,推荐显式创建和管理 EventBus 实例以获得更好的控制和测试性

View Source
var TopicSeparators = []string{".", "/"}

TopicSeparators 定义允许的主题分隔符

Functions

func AddFilter added in v0.1.6

func AddFilter(filter EventFilter)

AddFilter 添加过滤器,使用全局单例

func Close

func Close()

Close 关闭全局单例

func GetStats added in v0.2.0

func GetStats() map[string]interface{}

GetStats 获取统计信息,使用全局单例

func HealthCheck added in v0.1.6

func HealthCheck() error

HealthCheck 健康检查,使用全局单例

func Publish

func Publish(topic string, payload any) error

Publish 异步发布消息,使用全局单例

func PublishAsyncWithContext added in v0.2.0

func PublishAsyncWithContext(ctx context.Context, topic string, payload any) error

PublishAsyncWithContext 带上下文异步发布,使用全局单例

func PublishSync

func PublishSync(topic string, payload any) error

PublishSync 同步发布消息,使用全局单例

func PublishSyncWithContext added in v0.2.0

func PublishSyncWithContext(ctx context.Context, topic string, payload any) error

PublishSyncWithContext 带上下文同步发布,使用全局单例

func PublishWithContext added in v0.1.6

func PublishWithContext(ctx context.Context, topic string, payload any) error

PublishWithContext 带上下文发布消息,使用全局单例

func ResetSingleton

func ResetSingleton()

ResetSingleton 重置单例对象,主要用于测试

func SetTracer added in v0.1.6

func SetTracer(tracer EventTracer)

SetTracer 设置追踪器,使用全局单例

func Subscribe

func Subscribe(topic string, handler any) error

Subscribe 订阅主题,使用全局单例

func SubscribeWithPriority added in v0.1.6

func SubscribeWithPriority(topic string, handler any, priority int) error

SubscribeWithPriority 带优先级订阅,使用全局单例

func SubscribeWithResponse added in v0.2.0

func SubscribeWithResponse(topic string, handler ResponseHandler) error

SubscribeWithResponse 响应式订阅,使用全局单例

func SubscribeWithResponseContext added in v0.2.0

func SubscribeWithResponseContext(topic string, handler ResponseHandlerWithContext) error

SubscribeWithResponseContext 带上下文的响应式订阅,使用全局单例

func Unsubscribe

func Unsubscribe(topic string, handler any) error

Unsubscribe 取消订阅主题,使用全局单例

func UnsubscribeAll added in v0.2.0

func UnsubscribeAll(topic string) error

UnsubscribeAll 取消主题的所有订阅,使用全局单例

func Use added in v0.1.6

func Use(middleware IMiddleware)

Use 添加中间件,使用全局单例

func WrapError

func WrapError(err error, format string, args ...interface{}) error

WrapError 用附加的上下文信息包装错误

Types

type CompleteMetadata added in v0.1.3

type CompleteMetadata struct {
	StartTime      time.Time     // 开始处理时间
	EndTime        time.Time     // 结束处理时间
	ProcessingTime time.Duration // 处理耗时
	HandlerCount   int           // 处理器数量
	Success        bool          // 是否成功
}

CompleteMetadata 事件处理完成的元数据

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus 优化后的事件总线

func New

func New(bufferSize ...int) *EventBus

New 创建事件总线 可选参数:bufferSize - 缓冲区大小,不传或传0表示无缓冲,负数使用默认缓冲大小

func (*EventBus) AddFilter added in v0.1.1

func (e *EventBus) AddFilter(filter EventFilter)

AddFilter 添加过滤器

func (*EventBus) Close

func (e *EventBus) Close()

Close 关闭事件总线

func (*EventBus) GetStats added in v0.1.6

func (e *EventBus) GetStats() map[string]interface{}

GetStats 获取统计信息

func (*EventBus) HealthCheck

func (e *EventBus) HealthCheck() error

HealthCheck 健康检查

func (*EventBus) NewGroup added in v0.1.2

func (e *EventBus) NewGroup(prefix string) *TopicGroup

NewGroup 创建一个新的主题组

func (*EventBus) Publish

func (e *EventBus) Publish(topic string, payload any) error

Publish 异步发布消息

func (*EventBus) PublishAsyncWithContext added in v0.2.0

func (e *EventBus) PublishAsyncWithContext(ctx context.Context, topic string, payload any) error

PublishAsyncWithContext 显式的带上下文异步发布

func (*EventBus) PublishSync

func (e *EventBus) PublishSync(topic string, payload any) error

PublishSync 同步发布消息

func (*EventBus) PublishSyncAll added in v0.2.0

func (e *EventBus) PublishSyncAll(topic string, payload any) (*SyncResult, error)

PublishSyncAll 所有处理器成功才算成功

func (*EventBus) PublishSyncAllWithContext added in v0.2.0

func (e *EventBus) PublishSyncAllWithContext(ctx context.Context, topic string, payload any) (*SyncResult, error)

PublishSyncAllWithContext 所有处理器成功才算成功,且透传调用方上下文

func (*EventBus) PublishSyncAny added in v0.2.0

func (e *EventBus) PublishSyncAny(topic string, payload any) (*SyncResult, error)

PublishSyncAny 任一处理器成功即算成功

func (*EventBus) PublishSyncAnyWithContext added in v0.2.0

func (e *EventBus) PublishSyncAnyWithContext(ctx context.Context, topic string, payload any) (*SyncResult, error)

PublishSyncAnyWithContext 任一处理器成功即算成功,且透传调用方上下文

func (*EventBus) PublishSyncWithContext added in v0.2.0

func (e *EventBus) PublishSyncWithContext(ctx context.Context, topic string, payload any) error

PublishSyncWithContext 带上下文的同步发布

func (*EventBus) PublishWithContext added in v0.1.6

func (e *EventBus) PublishWithContext(ctx context.Context, topic string, payload any) error

PublishWithContext 带上下文的异步发布

func (*EventBus) SetTracer added in v0.1.3

func (e *EventBus) SetTracer(tracer EventTracer)

SetTracer 设置追踪器

func (*EventBus) Subscribe

func (e *EventBus) Subscribe(topic string, handler any) error

Subscribe 订阅主题

func (*EventBus) SubscribeWithPriority added in v0.1.1

func (e *EventBus) SubscribeWithPriority(topic string, handler any, priority int) error

SubscribeWithPriority 带优先级订阅

func (*EventBus) SubscribeWithResponse added in v0.2.0

func (e *EventBus) SubscribeWithResponse(topic string, handler ResponseHandler) error

SubscribeWithResponse 订阅支持返回值的处理器(默认优先级0)

func (*EventBus) SubscribeWithResponseAndPriority added in v0.2.0

func (e *EventBus) SubscribeWithResponseAndPriority(topic string, handler ResponseHandler, priority int) error

SubscribeWithResponseAndPriority 带优先级的响应订阅

func (*EventBus) SubscribeWithResponseContext added in v0.2.0

func (e *EventBus) SubscribeWithResponseContext(topic string, handler ResponseHandlerWithContext) error

SubscribeWithResponseContext 订阅支持context的响应处理器(默认优先级0)

func (*EventBus) SubscribeWithResponseContextAndPriority added in v0.2.0

func (e *EventBus) SubscribeWithResponseContextAndPriority(topic string, handler ResponseHandlerWithContext, priority int) error

SubscribeWithResponseContextAndPriority 带优先级的context响应订阅

func (*EventBus) Unsubscribe

func (e *EventBus) Unsubscribe(topic string, handler any) error

Unsubscribe 取消订阅

func (*EventBus) UnsubscribeAll added in v0.1.4

func (e *EventBus) UnsubscribeAll(topic string) error

UnsubscribeAll 取消主题的所有订阅

func (*EventBus) Use added in v0.1.1

func (e *EventBus) Use(middleware IMiddleware)

Use 添加中间件

type EventFilter added in v0.1.1

type EventFilter interface {
	// Filter 返回 true 表示事件可以继续传递,false 表示拦截该事件
	Filter(topic string, payload any) bool
}

EventFilter 定义事件过滤器接口

type EventTracer added in v0.1.3

type EventTracer interface {
	// OnPublish 在消息发布时调用
	OnPublish(topic string, payload any, metadata PublishMetadata)
	// OnSubscribe 在订阅时调用
	OnSubscribe(topic string, handler any)
	// OnUnsubscribe 在取消订阅时调用
	OnUnsubscribe(topic string, handler any)
	// OnError 在发生错误时调用
	OnError(topic string, err error)
	// OnComplete 在消息处理完成时调用
	OnComplete(topic string, metadata CompleteMetadata)
	// OnQueueFull 在队列满时调用
	OnQueueFull(topic string, size int)
	// OnSlowConsumer 在慢消费者时调用
	OnSlowConsumer(topic string, latency time.Duration)
}

EventTracer 定义事件追踪接口

type FilterFunc added in v0.2.0

type FilterFunc func(topic string, payload any) bool

FilterFunc 函数式过滤器适配器,便于直接使用函数

func (FilterFunc) Filter added in v0.2.0

func (f FilterFunc) Filter(topic string, payload any) bool

Filter 实现 EventFilter 接口

type Handler

type Handler[T any] func(payload T)

Handler 定义了一个函数类型,用于处理 T 类型的消息

type HandlerInfo added in v0.1.6

type HandlerInfo struct {
	Priority   int
	ID         uintptr // 用于唯一标识处理器
	IsResponse bool    // 标识是否为响应处理器
	// contains filtered or unexported fields
}

HandlerInfo 处理器信息,支持优先级

type HandlerResult added in v0.2.0

type HandlerResult struct {
	HandlerID string        `json:"handler_id"` // 处理器唯一标识
	Success   bool          `json:"success"`    // 执行是否成功
	Result    any           `json:"result"`     // 返回值
	Error     error         `json:"error"`      // 错误信息
	Duration  time.Duration `json:"duration"`   // 执行耗时
}

HandlerResult 处理器执行结果

type HandlerWithPriority added in v0.1.6

type HandlerWithPriority[T any] struct {
	Handler         Handler[T]
	ResponseHandler PipeResponseHandler[T]
	Priority        int
	ID              uintptr
}

HandlerWithPriority 带优先级的处理器

type IMiddleware added in v0.2.0

type IMiddleware interface {
	// Before 在事件处理前执行,可以修改payload
	Before(topic string, payload any) any
	// After 在事件处理后执行
	After(topic string, payload any)
}

IMiddleware 定义事件处理中间件接口

type LatencyStats added in v0.1.3

type LatencyStats struct {
	// contains filtered or unexported fields
}

LatencyStats 延迟统计

type MetricsTracer added in v0.1.3

type MetricsTracer struct {
	// contains filtered or unexported fields
}

MetricsTracer 实现基础的指标收集

func NewMetricsTracer added in v0.1.3

func NewMetricsTracer() *MetricsTracer

NewMetricsTracer 创建新的指标追踪器

func (*MetricsTracer) GetLatencyMetrics added in v0.1.3

func (m *MetricsTracer) GetLatencyMetrics() map[string]map[string]interface{}

GetLatencyMetrics 获取延迟指标

func (*MetricsTracer) GetMetrics added in v0.1.3

func (m *MetricsTracer) GetMetrics() map[string]interface{}

GetMetrics 获取当前指标

func (*MetricsTracer) GetQueueMetrics added in v0.1.3

func (m *MetricsTracer) GetQueueMetrics() map[string]map[string]int64

GetQueueMetrics 获取队列指标

func (*MetricsTracer) OnComplete added in v0.1.3

func (m *MetricsTracer) OnComplete(topic string, metadata CompleteMetadata)

OnComplete 在消息处理完成时调用

func (*MetricsTracer) OnError added in v0.1.3

func (m *MetricsTracer) OnError(topic string, err error)

OnError 在发生错误时调用

func (*MetricsTracer) OnPublish added in v0.1.3

func (m *MetricsTracer) OnPublish(topic string, payload any, metadata PublishMetadata)

OnPublish 在消息发布时调用

func (*MetricsTracer) OnQueueFull added in v0.1.3

func (m *MetricsTracer) OnQueueFull(topic string, size int)

OnQueueFull 在队列满时调用

func (*MetricsTracer) OnSlowConsumer added in v0.1.3

func (m *MetricsTracer) OnSlowConsumer(topic string, latency time.Duration)

OnSlowConsumer 在慢消费者时调用

func (*MetricsTracer) OnSubscribe added in v0.1.3

func (m *MetricsTracer) OnSubscribe(topic string, handler any)

OnSubscribe 在订阅时调用

func (*MetricsTracer) OnUnsubscribe added in v0.1.3

func (m *MetricsTracer) OnUnsubscribe(topic string, handler any)

OnUnsubscribe 在取消订阅时调用

type Middleware added in v0.1.1

type Middleware struct {
	// contains filtered or unexported fields
}

Middleware 提供基础性能统计与可选的负载转换能力

func NewMiddleware added in v0.2.0

func NewMiddleware() *Middleware

NewMiddleware 创建增强中间件实例

func (*Middleware) After added in v0.1.1

func (m *Middleware) After(topic string, payload any)

After 记录耗时

func (*Middleware) Before added in v0.1.1

func (m *Middleware) Before(topic string, payload any) any

Before 记录开始时间,可选地转换负载

func (*Middleware) GetStats added in v0.2.0

func (m *Middleware) GetStats() map[string]TopicStat

GetStats 获取性能统计快照

func (*Middleware) Reset added in v0.2.0

func (m *Middleware) Reset()

Reset 清空统计信息

func (*Middleware) SetTransformer added in v0.2.0

func (m *Middleware) SetTransformer(fn func(topic string, payload any) any)

SetTransformer 设置负载转换函数(可选)

type Pipe

type Pipe[T any] struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Pipe 优化后的泛型管道

func NewBufferedPipe

func NewBufferedPipe[T any](bufferSize int) *Pipe[T]

NewBufferedPipe 创建一个带指定缓冲区大小的管道

func NewBufferedPipeWithTimeout added in v0.2.0

func NewBufferedPipeWithTimeout[T any](bufferSize int, timeout time.Duration) *Pipe[T]

NewBufferedPipeWithTimeout 创建一个带缓冲的管道并自定义超时

func NewPipe

func NewPipe[T any]() *Pipe[T]

NewPipe 创建一个无缓冲的管道

func NewPipeWithTimeout added in v0.2.0

func NewPipeWithTimeout[T any](timeout time.Duration) *Pipe[T]

NewPipeWithTimeout 创建一个无缓冲管道并自定义超时

func (*Pipe[T]) Close

func (p *Pipe[T]) Close()

Close 关闭管道

func (*Pipe[T]) GetStats added in v0.1.6

func (p *Pipe[T]) GetStats() map[string]interface{}

GetStats 获取管道统计信息

func (*Pipe[T]) Publish

func (p *Pipe[T]) Publish(payload T) error

Publish 异步发送消息给所有订阅者

func (*Pipe[T]) PublishSync

func (p *Pipe[T]) PublishSync(payload T) error

PublishSync 同步发送消息给所有订阅者

func (*Pipe[T]) PublishSyncAll added in v0.2.0

func (p *Pipe[T]) PublishSyncAll(payload T) (*PipeSyncResult, error)

PublishSyncAll 响应式同步发布,要求所有处理器都成功

func (*Pipe[T]) PublishSyncAny added in v0.2.0

func (p *Pipe[T]) PublishSyncAny(payload T) (*PipeSyncResult, error)

PublishSyncAny 响应式同步发布,只要有一个处理器成功即可

func (*Pipe[T]) PublishWithContext added in v0.1.6

func (p *Pipe[T]) PublishWithContext(ctx context.Context, payload T) error

PublishWithContext 带上下文的异步发送

func (*Pipe[T]) Subscribe

func (p *Pipe[T]) Subscribe(handler Handler[T]) error

Subscribe 向管道添加一个处理器

func (*Pipe[T]) SubscribeWithPriority added in v0.1.6

func (p *Pipe[T]) SubscribeWithPriority(handler Handler[T], priority int) error

SubscribeWithPriority 向管道添加一个带优先级的处理器

func (*Pipe[T]) SubscribeWithResponse added in v0.2.0

func (p *Pipe[T]) SubscribeWithResponse(handler PipeResponseHandler[T]) (PipeResponseCancel, error)

SubscribeWithResponse 向管道添加一个响应式处理器

func (*Pipe[T]) SubscribeWithResponseAndPriority added in v0.2.0

func (p *Pipe[T]) SubscribeWithResponseAndPriority(handler PipeResponseHandler[T], priority int) (PipeResponseCancel, error)

SubscribeWithResponseAndPriority 向管道添加一个带优先级的响应式处理器

func (*Pipe[T]) Unsubscribe

func (p *Pipe[T]) Unsubscribe(handler Handler[T]) error

Unsubscribe 从管道中移除一个处理器

type PipeHandlerResult added in v0.2.0

type PipeHandlerResult struct {
	HandlerID string        `json:"handler_id"` // 处理器唯一标识
	Success   bool          `json:"success"`    // 执行是否成功
	Result    any           `json:"result"`     // 返回值
	Error     error         `json:"error"`      // 错误信息
	Duration  time.Duration `json:"duration"`   // 执行耗时
}

PipeHandlerResult 处理器执行结果

type PipeResponseCancel added in v0.2.0

type PipeResponseCancel func()

PipeResponseCancel 用于撤销响应式处理器订阅

type PipeResponseHandler added in v0.2.0

type PipeResponseHandler[T any] func(payload T) (any, error)

PipeResponseHandler 定义了一个函数类型,用于处理 T 类型的消息并返回响应

type PipeSyncResult added in v0.2.0

type PipeSyncResult struct {
	Success      bool                `json:"success"`       // 整体是否成功
	HandlerCount int                 `json:"handler_count"` // 处理器总数
	SuccessCount int                 `json:"success_count"` // 成功数量
	FailureCount int                 `json:"failure_count"` // 失败数量
	Results      []PipeHandlerResult `json:"results"`       // 详细结果
	TotalTime    time.Duration       `json:"total_time"`    // 总耗时
}

PipeSyncResult 同步发布结果

type PublishMetadata added in v0.1.3

type PublishMetadata struct {
	Timestamp   time.Time // 发布时间
	Async       bool      // 是否异步发布
	QueueSize   int       // 队列大小
	PublisherID string    // 发布者ID(可选)
}

PublishMetadata 发布事件的元数据

type QueueStats added in v0.1.3

type QueueStats struct {
	// contains filtered or unexported fields
}

QueueStats 队列统计

type ResponseHandler added in v0.2.0

type ResponseHandler func(topic string, payload any) (any, error)

ResponseHandler 响应式处理器(不带context参数) 签名: func(topic string, payload any) (any, error)

type ResponseHandlerWithContext added in v0.2.0

type ResponseHandlerWithContext func(ctx context.Context, topic string, payload any) (any, error)

ResponseHandlerWithContext 响应式处理器(带context参数) 签名: func(ctx context.Context, topic string, payload any) (any, error)

type SmartFilter added in v0.2.0

type SmartFilter struct {
	// contains filtered or unexported fields
}

SmartFilter 提供基础限流与主题阻断能力

func NewSmartFilter added in v0.2.0

func NewSmartFilter() *SmartFilter

NewSmartFilter 创建智能过滤器,默认限流窗口为 1 分钟

func (*SmartFilter) BlockTopic added in v0.2.0

func (f *SmartFilter) BlockTopic(topic string)

BlockTopic 阻断指定主题(含其子层级)

func (*SmartFilter) Filter added in v0.2.0

func (f *SmartFilter) Filter(topic string, payload any) bool

Filter 实现 EventFilter 接口

func (*SmartFilter) SetLimit added in v0.2.0

func (f *SmartFilter) SetLimit(topic string, limit int)

SetLimit 设置指定主题的限流次数(窗口内最大可通过次数)

func (*SmartFilter) SetWindow added in v0.2.0

func (f *SmartFilter) SetWindow(window time.Duration)

SetWindow 调整限流窗口大小

func (*SmartFilter) UnblockTopic added in v0.2.0

func (f *SmartFilter) UnblockTopic(topic string)

UnblockTopic 解除主题阻断

type Subscription added in v0.1.1

type Subscription struct {
	Handler  any
	Priority int
}

Subscription 定义带优先级的订阅信息

type SyncResult added in v0.2.0

type SyncResult struct {
	Success      bool            `json:"success"`       // 整体是否成功
	HandlerCount int             `json:"handler_count"` // 处理器总数
	SuccessCount int             `json:"success_count"` // 成功数量
	FailureCount int             `json:"failure_count"` // 失败数量
	Results      []HandlerResult `json:"results"`       // 详细结果
	TotalTime    time.Duration   `json:"total_time"`    // 总耗时
}

SyncResult 同步发布结果

func PublishSyncAll added in v0.2.0

func PublishSyncAll(topic string, payload any) (*SyncResult, error)

PublishSyncAll 响应式同步发布(全部成功),使用全局单例

func PublishSyncAllWithContext added in v0.2.0

func PublishSyncAllWithContext(ctx context.Context, topic string, payload any) (*SyncResult, error)

PublishSyncAllWithContext 响应式同步发布(全部成功,透传上下文)

func PublishSyncAny added in v0.2.0

func PublishSyncAny(topic string, payload any) (*SyncResult, error)

PublishSyncAny 响应式同步发布(任一成功),使用全局单例

func PublishSyncAnyWithContext added in v0.2.0

func PublishSyncAnyWithContext(ctx context.Context, topic string, payload any) (*SyncResult, error)

PublishSyncAnyWithContext 响应式同步发布(任一成功,透传上下文)

type TopicGroup added in v0.1.2

type TopicGroup struct {
	// contains filtered or unexported fields
}

TopicGroup 表示一个主题组

func NewGroup added in v0.2.0

func NewGroup(prefix string) *TopicGroup

NewGroup 基于全局单例创建主题组

func (*TopicGroup) Publish added in v0.1.2

func (g *TopicGroup) Publish(topic string, payload any) error

Publish 在组内发布消息

func (*TopicGroup) PublishSync added in v0.2.0

func (g *TopicGroup) PublishSync(topic string, payload any) error

PublishSync 在组内同步发布消息

func (*TopicGroup) PublishSyncAll added in v0.2.0

func (g *TopicGroup) PublishSyncAll(topic string, payload any) (*SyncResult, error)

PublishSyncAll 组内响应式同步发布(全部成功)

func (*TopicGroup) PublishSyncAllWithContext added in v0.2.0

func (g *TopicGroup) PublishSyncAllWithContext(ctx context.Context, topic string, payload any) (*SyncResult, error)

PublishSyncAllWithContext 组内响应式同步发布(全部成功,透传上下文)

func (*TopicGroup) PublishSyncAny added in v0.2.0

func (g *TopicGroup) PublishSyncAny(topic string, payload any) (*SyncResult, error)

PublishSyncAny 组内响应式同步发布(任一成功)

func (*TopicGroup) PublishSyncAnyWithContext added in v0.2.0

func (g *TopicGroup) PublishSyncAnyWithContext(ctx context.Context, topic string, payload any) (*SyncResult, error)

PublishSyncAnyWithContext 组内响应式同步发布(任一成功,透传上下文)

func (*TopicGroup) PublishSyncWithContext added in v0.2.0

func (g *TopicGroup) PublishSyncWithContext(ctx context.Context, topic string, payload any) error

PublishSyncWithContext 在组内带上下文同步发布

func (*TopicGroup) PublishWithContext added in v0.2.0

func (g *TopicGroup) PublishWithContext(ctx context.Context, topic string, payload any) error

PublishWithContext 在组内带上下文异步发布

func (*TopicGroup) Subscribe added in v0.1.2

func (g *TopicGroup) Subscribe(topic string, handler any) error

Subscribe 订阅组内的主题

func (*TopicGroup) SubscribeWithPriority added in v0.2.0

func (g *TopicGroup) SubscribeWithPriority(topic string, handler any, priority int) error

SubscribeWithPriority 组内带优先级订阅

func (*TopicGroup) SubscribeWithResponse added in v0.2.0

func (g *TopicGroup) SubscribeWithResponse(topic string, handler ResponseHandler) error

SubscribeWithResponse 组内响应式订阅

func (*TopicGroup) SubscribeWithResponseContext added in v0.2.0

func (g *TopicGroup) SubscribeWithResponseContext(topic string, handler ResponseHandlerWithContext) error

SubscribeWithResponseContext 组内带上下文的响应式订阅

func (*TopicGroup) Unsubscribe added in v0.1.2

func (g *TopicGroup) Unsubscribe(topic string, handler any) error

Unsubscribe 取消订阅组内的主题

func (*TopicGroup) UnsubscribeAll added in v0.2.0

func (g *TopicGroup) UnsubscribeAll(topic string) error

UnsubscribeAll 取消组内主题的所有订阅

type TopicStat added in v0.2.0

type TopicStat struct {
	Count     int
	TotalTime time.Duration
}

TopicStat 记录主题处理统计信息

Directories

Path Synopsis
examples
full command
response command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL