Pushlet 是一个基于 Go 语言的轻量级实时消息推送库,同时支持 Server-Sent Events (SSE) 和 WebSocket 两种协议。它支持单机和分布式部署模式,是构建实时通知、事件流和数据更新等功能的理想选择。
- 🚀 双协议支持 - 同时支持 SSE 和 WebSocket 协议
- 📡 多主题订阅 - 支持基于主题的消息订阅和发布
- 🌐 分布式架构 - 通过 Redis 实现多实例间的消息同步
- 💪 动态订阅 - WebSocket 客户端可通过消息动态订阅/取消订阅主题
- 🔒 二进制传输 - WebSocket 使用二进制格式传输,提高效率
- ❤️ 心跳保活 - 自动发送心跳消息保持连接活跃
- 🔧 简单易用 - 简洁的 API 设计,易于集成到现有项目
- ⚡ 低延迟 - 消息实时推送,适合需要即时反馈的场景
- 🛡️ 高可靠 - 断线自动重连,消息不丢失
- 📊 可观测 - 内置日志系统,支持自定义日志记录器
go get github.com/usual2970/pushlet
package main import ( "log" "net/http" "time" "github.com/usual2970/pushlet" ) func main() { // 创建 Pushlet 实例 p := pushlet.New() // 设置心跳间隔 p.SetHeartbeatInterval(30 * time.Second) // 启动消息代理 p.Start() defer p.Stop() // SSE 端点 http.HandleFunc("/events", p.HandleSSE) // WebSocket 端点 http.HandleFunc("/ws", p.HandleWebSocket) // 消息发送接口 http.HandleFunc("/send", func(w http.ResponseWriter, r *http.Request) { topic := r.URL.Query().Get("topic") if topic == "" { topic = "default" } message := r.URL.Query().Get("message") // 同时发送到 SSE 和 WebSocket 客户端 p.Publish(topic, "message", message) w.Write([]byte("Message sent")) }) log.Println("Server started at http://localhost:8080") log.Println("SSE endpoint: http://localhost:8080/events") log.Println("WebSocket endpoint: ws://localhost:8080/ws") log.Fatal(http.ListenAndServe(":8080", nil)) }
package main import ( "log" "net/http" "github.com/usual2970/pushlet" ) func main() { // 创建 Pushlet 实例 p := pushlet.New() // 启用分布式模式 err := p.EnableDistributedMode("localhost:6379", "", 0) if err != nil { log.Fatalf("Failed to enable distributed mode: %v", err) } // 启动消息代理 p.Start() defer p.Stop() // 处理连接请求 http.HandleFunc("/events", p.HandleSSE) // SSE http.HandleFunc("/ws", p.HandleWebSocket) // WebSocket // 消息发送接口 http.HandleFunc("/send", func(w http.ResponseWriter, r *http.Request) { topic := r.URL.Query().Get("topic") if topic == "" { topic = "default" } message := r.URL.Query().Get("message") p.Publish(topic, "message", message) w.Write([]byte("Message sent to all instances")) }) log.Println("Distributed server started at http://localhost:8080") log.Fatal(http.ListenAndServe(":8080", nil)) }
| 特性 | SSE | WebSocket |
|---|---|---|
| 传输方向 | 单向(服务器到客户端) | 双向 |
| 数据格式 | 文本(事件流) | 二进制 JSON |
| 订阅方式 | URL 参数指定主题 | 消息动态订阅 |
| 浏览器支持 | 现代浏览器原生支持 | 现代浏览器原生支持 |
| 连接开销 | 低 | 低 |
| 协议复杂度 | 简单 | 中等 |
| 断线重连 | 浏览器自动处理 | 需要手动处理 |
| 多主题支持 | 一个连接一个主题 | 一个连接多个主题 |
<!DOCTYPE html> <html> <head> <title>SSE 客户端</title> </head> <body> <div id="messages"></div> <script> // 连接到特定主题 const evtSource = new EventSource("/events?topic=my-topic"); evtSource.addEventListener("message", function(e) { const div = document.createElement("div"); div.textContent = `SSE 消息: ${e.data}`; document.getElementById("messages").appendChild(div); }); evtSource.onerror = function() { console.log("SSE 连接错误,正在重新连接..."); }; </script> </body> </html>
<!DOCTYPE html> <html> <head> <title>WebSocket 动态订阅客户端</title> </head> <body> <div id="messages"></div> <button onclick="subscribe('topic1')">订阅 topic1</button> <button onclick="unsubscribe('topic1')">取消订阅 topic1</button> <script> const ws = new WebSocket("ws://localhost:8080/ws"); ws.onmessage = function(event) { if (event.data instanceof Blob) { // 处理二进制数据 event.data.arrayBuffer().then(buffer => { const decoder = new TextDecoder(); const jsonText = decoder.decode(buffer); const msg = JSON.parse(jsonText); const div = document.createElement("div"); div.textContent = `[${msg.event}] ${msg.data}`; document.getElementById("messages").appendChild(div); }); } }; function subscribe(topic) { ws.send(JSON.stringify({ action: 'subscribe', topic: topic })); } function unsubscribe(topic) { ws.send(JSON.stringify({ action: 'unsubscribe', topic: topic })); } </script> </body> </html>
package main import ( "encoding/json" "log" "net/url" "github.com/gorilla/websocket" ) type Message struct { Event string `json:"event"` Data string `json:"data"` Timestamp time.Time `json:"timestamp"` } type SubscriptionMessage struct { Action string `json:"action"` // "subscribe" 或 "unsubscribe" Topic string `json:"topic"` } func main() { u := url.URL{Scheme: "ws", Host: "localhost:8080", Path: "/ws"} c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) if err != nil { log.Fatal("连接失败:", err) } defer c.Close() // 订阅主题 subMsg := SubscriptionMessage{ Action: "subscribe", Topic: "my-topic", } c.WriteJSON(subMsg) for { msgType, message, err := c.ReadMessage() if err != nil { log.Println("读取消息出错:", err) break } if msgType == websocket.BinaryMessage { // 处理二进制消息 var msg Message if err := json.Unmarshal(message, &msg); err == nil { log.Printf("[%s] %s", msg.Event, msg.Data) } } } }
创建一个新的 Pushlet 实例,支持选项配置。
配置自定义日志记录器。
设置心跳间隔时间。
启动消息代理,开始处理消息。
停止消息代理,关闭所有连接。
启用分布式模式,通过 Redis 同步消息。
处理 SSE 连接请求,支持通过 topic 查询参数指定主题。
处理 WebSocket 连接请求,支持动态主题订阅。
向指定主题发布消息。
向所有主题发布消息。
event: message
data: Hello, World!
{
"action": "subscribe",
"topic": "my-topic"
}{
"event": "message",
"data": "Hello, World!",
"timestamp": "2024年01月01日T12:00:00Z"
}WebSocket 客户端可以在连接后动态管理订阅:
// 订阅主题 ws.send(JSON.stringify({ action: 'subscribe', topic: 'user-notifications' })); // 取消订阅 ws.send(JSON.stringify({ action: 'unsubscribe', topic: 'user-notifications' }));
pushlet/
├── broker.go # 消息代理核心逻辑
├── client.go # 客户端连接管理
├── logger.go # 日志系统
├── message.go # 消息结构定义
├── pushlet.go # 主要 API 入口
├── redis_connector.go # Redis 分布式支持
├── go.mod # Go 模块定义
└── example/ # 使用示例
└── dual_protocol.go
- WebSocket 二进制传输:相比文本传输减少约 20-30% 的数据量
- 心跳保活:防止代理服务器超时,提高连接稳定性
- 分布式架构:水平扩展支持更多并发连接
- 动态订阅:一个 WebSocket 连接可管理多个主题,减少连接数
- 📱 实时通知系统 - 用户消息、系统通知
- 📊 实时数据监控 - 服务器状态、性能指标
- 💬 聊天应用 - 消息推送、在线状态
- 🎮 实时游戏 - 游戏状态同步
- 📈 股票行情 - 实时价格推送
- 🔔 事件提醒 - 任务提醒、日程通知
- 只需单向推送时使用 SSE:简单的通知、状态更新
- 需要双向通信或多主题管理时使用 WebSocket:复杂的实时应用
- 生产环境建议使用 Redis 集群
- 设置合适的心跳间隔(建议 30-60 秒)
- 考虑负载均衡和故障转移
- 客户端应实现重连逻辑
- 服务端应处理连接异常
- 使用自定义日志记录器监控系统状态
MIT License - 详见 LICENSE 文件
欢迎提交 Issue 和 Pull Request 来改进这个项目!
- Fork 本仓库
- 创建你的特性分支 (
git checkout -b feature/AmazingFeature) - 提交你的更改 (
git commit -m 'Add some AmazingFeature') - 推送到分支 (
git push origin feature/AmazingFeature) - 打开一个 Pull Request
如果你在使用过程中遇到问题或有建议,请:
- 提交 GitHub Issue
- 查看 文档和示例
Pushlet - 让实时消息推送变得简单高效 🚀
message := r.URL.Query().Get("message")
p.Publish(topic, "message", message)
w.Write([]byte("Message sent"))
})
log.Println("Server started at http://localhost:8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
### 分布式部署
```go
package main
import (
"log"
"net/http"
"github.com/usual2970/pushlet"
)
func main() {
// 创建 Pushlet 实例
p := pushlet.New()
// 启用分布式模式
err := p.EnableDistributedMode("localhost:6379", "", 0)
if err != nil {
log.Fatalf("Failed to enable distributed mode: %v", err)
}
// 启动消息代理
p.Start()
defer p.Stop()
// 处理 SSE 连接请求
http.HandleFunc("/events", p.HandleSSE)
// 消息发送接口
http.HandleFunc("/send", func(w http.ResponseWriter, r *http.Request) {
topic := r.URL.Query().Get("topic")
if topic == "" {
topic = "default"
}
message := r.URL.Query().Get("message")
p.Publish(topic, "message", message)
w.Write([]byte("Message sent to all instances"))
})
log.Println("Server started at http://localhost:8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
<!DOCTYPE html> <html> <head> <title>Pushlet 客户端示例</title> </head> <body> <h1>Pushlet 消息接收器</h1> <div id="messages"></div> <script> const evtSource = new EventSource("/events"); const messagesDiv = document.getElementById("messages"); // 监听默认消息 evtSource.addEventListener("message", function(e) { const newElement = document.createElement("div"); newElement.textContent = `收到消息: ${e.data}`; messagesDiv.appendChild(newElement); }); // 监听自定义事件 evtSource.addEventListener("custom-event", function(e) { const newElement = document.createElement("div"); newElement.textContent = `收到自定义事件: ${e.data}`; newElement.style.color = "blue"; messagesDiv.appendChild(newElement); }); // 处理连接错误 evtSource.onerror = function() { const newElement = document.createElement("div"); newElement.textContent = "连接错误,正在重新连接..."; newElement.style.color = "red"; messagesDiv.appendChild(newElement); }; </script> </body> </html>
创建一个新的 Pushlet 实例。
启动消息代理,开始处理消息。
停止消息代理,关闭所有连接。
启用分布式模式,通过 Redis 同步消息。
处理 SSE 连接请求,建立实时消息通道。
向指定主题发布消息。
向所有主题发布消息。
// 发送自定义事件类型 p.Publish("user-notifications", "user-login", "User John has logged in")
// 客户端监听自定义事件 evtSource.addEventListener("user-login", function(e) { console.log("User login event:", e.data); });
// 主题可以用路径格式组织 p.Publish("users/123/notifications", "new-message", "你有一条新消息")
对于高并发场景,建议使用分布式模式并结合负载均衡:
// 启用分布式模式 p.EnableDistributedMode("redis-server:6379", "password", 0) // 增加缓冲区大小以处理更多连接 http.ListenAndServe(":8080", nil)
- SSE 连接会占用服务器资源,建议在高负载场景使用分布式部署
- 考虑为长期空闲的连接设置超时机制
- 对于超大规模部署,考虑使用消息队列作为中间层
欢迎提交 Issue 和 Pull Request 来改进这个项目。贡献前请确保:
- 代码风格符合 Go 的规范
- 添加测试用例
- 更新文档
MIT
感谢所有贡献者以及 Go 社区的支持。