分享
  1. 首页
  2. 文章

使用 Go 语言实现完整且轻量级高性能的 MQTT Broker

qq8864 · · 342 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

> MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息传输协议。但是目前虽然mqtt的客户端很多,但是服务端着实不多,常见的服务端如mosquitto或emqx。但是golang语言的实现几乎找不到。golang的轻量级部署和高并发高性能,很合适做mqtt Broker。本文将详细介绍如何使用 Go 语言实现一个简单轻量级且高性能的 MQTT Broker,并涵盖MQTT3.1.1协议的核心特性和完整功能。 ### 1. 需求分析 本文选择golang语言实现一个完整的 MQTT 3.1.1 Broker,不涉及集群支持和协议版本检测。简单且轻量级,不但可以替代mosquitto,后续还可以灵活的做扩展,如增加webUI的管理界面。且部署也很简单,一个exe可执行文件。 完整项目开源地址:[https://github.com/yangyongzhen/goang-mqtt-broker](https://github.com/yangyongzhen/goang-mqtt-broker) gitee: [https://gitee.com/yyz116/goang-mqtt-broker](https://gitee.com/yyz116/goang-mqtt-broker) 可执行文件在release目录下。 #### 1.1 实现效果截图 服务启动: ![在这里插入图片描述](https://wsrv.nl/?url=https://i-blog.csdnimg.cn/direct/3bf23e9081464e21af84e01382c40423.png) 客户端发布: ![在这里插入图片描述](https://wsrv.nl/?url=https://i-blog.csdnimg.cn/direct/e2747610f37c4a6b9a5e68fa83c6e748.png) 客户端订阅: ![在这里插入图片描述](https://wsrv.nl/?url=https://i-blog.csdnimg.cn/direct/d2af79628209454c9b2277adf04084e4.png) 使用mosquitto客户端测试效果: ![在这里插入图片描述](https://wsrv.nl/?url=https://i-blog.csdnimg.cn/direct/a3d59b8d2a72435a94866266ab56609d.png) ![在这里插入图片描述](https://wsrv.nl/?url=https://i-blog.csdnimg.cn/direct/38415ffdfc3f493882baedd2c2610927.png) 优化增加基于redis的持久化存储: 可在etc/config.yaml文件中配置是否启用redis的持久化。默认基于内存。 ![在这里插入图片描述](https://wsrv.nl/?url=https://i-blog.csdnimg.cn/direct/dcfd75f28ac74770ab258464ca43c0c5.png) windows下的可执行文件仅有7M左右大小,简单小巧。且代码开源方便定制。可以作为替代mosquitto的另外一种选择。 ![在这里插入图片描述](https://wsrv.nl/?url=https://i-blog.csdnimg.cn/direct/00e4c627cd6a4fdd8b4dfa9f7f3217b1.png) #### 1.2 功能特性 ##### 1.2.1核心功能 - ✅ **完整的 MQTT 3.1.1 协议支持** - ✅ **QoS 0, 1, 2 消息传递保证** - ✅ **会话管理**(持久会话和清理会话) - ✅ **保留消息**(Retained Messages) - ✅ **遗嘱消息**(Last Will and Testament) - ✅ **主题通配符**(+ 和 # 通配符支持) - ✅ **客户端认证**(用户名/密码) - ✅ **保活机制**(Keep Alive) - ✅ **并发安全** ##### 1.2.2 架构特性 - 🏗️ **模块化设计**,易于扩展 - 🔌 **可插拔存储接口** - 🔒 **线程安全**的并发处理 - 📊 **内置监控指标** - 🐳 **Docker 支持** ### 2. 项目架构设计 ### 架构设计 ![](https://i-blog.csdnimg.cn/direct/ca6f560770414da5a4e7cbc7cf401bb1.png) **数据流** 客户端连接 → TCP Server 接受连接 协议解析 → Client 解析 MQTT 数据包 认证验证 → Auth 模块验证用户凭据 会话管理 → Storage 加载/保存会话信息 消息路由 → Broker 根据订阅关系路由消息 主题匹配 → Topic Manager 处理通配符匹配 #### 2.1 目录结构 ``` mqtt-broker/ ├── README.md ├── Makefile ├── Dockerfile ├── go.mod ├── go.sum ├── cmd/ │ ├── broker/ │ │ └── main.go │ └── test-client/ │ └── main.go ├── internal/ │ ├── auth/ │ │ └── auth.go │ ├── broker/ │ │ ├── broker.go │ │ ├── client.go │ │ └── topic.go │ ├── protocol/ │ │ ├── common/ │ │ │ └── types.go │ │ └── mqtt311/ │ │ └── packet.go │ └── storage/ │ ├── interface.go │ └── memory/ │ └── store.go └── pkg/ └── mqtt/ └── packet.go ``` #### 2.2 主要模块 - **cmd/broker/main.go**:程序入口。 - **internal/broker/**:Broker 核心逻辑,包括连接管理、消息路由等。 - **internal/storage/**:存储接口和内存实现。 - **pkg/mqtt/packet.go**:MQTT 数据包编码和解码。 ### 3. 核心实现 #### 3.1 存储接口 在 `internal/storage/interface.go` 文件中定义存储接口: ```go package storage import ( "github.com/yangyongzhen/mqtt-broker/internal/protocol/common" ) type Store interface { SaveSession(clientID string, session *Session) error LoadSession(clientID string) (*Session, error) DeleteSession(clientID string) error SaveMessage(clientID string, message *common.Message) error LoadMessages(clientID string) ([]*common.Message, error) DeleteMessage(clientID string, packetID uint16) error SaveRetainedMessage(topic string, message *common.Message) error LoadRetainedMessage(topic string) (*common.Message, error) DeleteRetainedMessage(topic string) error SaveSubscription(clientID string, subscription *common.Subscription) error LoadSubscriptions(clientID string) ([]*common.Subscription, error) DeleteSubscription(clientID string, topic string) error } type Session struct { ClientID string CleanSession bool Subscriptions map[string]*common.Subscription PendingAcks map[uint16]*common.Message LastSeen time.Time } ``` #### 3.2 内存存储实现 在 `internal/storage/memory/store.go` 文件中实现内存存储: ```go package memory import ( "sync" "github.com/yangyongzhen/mqtt-broker/internal/storage" "github.com/yangyongzhen/mqtt-broker/internal/protocol/common" ) type MemoryStore struct { sessions map[string]*storage.Session retainedMsgs map[string]*common.Message clientMessages map[string][]*common.Message mu sync.RWMutex } func NewMemoryStore() *MemoryStore { return &MemoryStore{ sessions: make(map[string]*storage.Session), retainedMsgs: make(map[string]*common.Message), clientMessages: make(map[string][]*common.Message), } } func (m *MemoryStore) SaveSession(clientID string, session *storage.Session) error { m.mu.Lock() defer m.mu.Unlock() m.sessions[clientID] = session return nil } func (m *MemoryStore) LoadSession(clientID string) (*storage.Session, error) { m.mu.RLock() defer m.mu.RUnlock() session, exists := m.sessions[clientID] if !exists { return nil, nil } return session, nil } // 其他方法省略... ``` #### 3.3 客户端连接管理 在 `internal/broker/client.go` 文件中实现客户端连接管理: ```go package broker import ( "bufio" "fmt" "net" "sync" "time" "github.com/yangyongzhen/mqtt-broker/internal/protocol/common" "github.com/yangyongzhen/mqtt-broker/internal/protocol/mqtt311" "github.com/yangyongzhen/mqtt-broker/internal/storage" "github.com/yangyongzhen/mqtt-broker/pkg/mqtt" ) type Client struct { conn net.Conn clientID string info *common.ClientInfo session *storage.Session broker *Broker packetReader *mqtt.PacketReader writeChan chan []byte closeChan chan struct{} keepAliveTimer *time.Timer mu sync.RWMutex connected bool nextPacketID uint16 pendingAcks map[uint16]*PendingMessage } type PendingMessage struct { Message *common.Message Timestamp time.Time Retries int } func NewClient(conn net.Conn, broker *Broker) *Client { return &Client{ conn: conn, broker: broker, packetReader: mqtt.NewPacketReader(conn), writeChan: make(chan []byte, 1000), closeChan: make(chan struct{}), pendingAcks: make(map[uint16]*PendingMessage), nextPacketID: 1, } } func (c *Client) Start() { go c.readLoop() go c.writeLoop() go c.retryLoop() } func (c *Client) readLoop() { defer c.Close() for { select { case <-c.closeChan: return default: packet, err := c.packetReader.ReadPacket() if err != nil { fmt.Printf("Read packet error: %v\n", err) return } c.handlePacket(packet) } } } func (c *Client) writeLoop() { defer c.Close() for { select { case data := <-c.writeChan: if _, err := c.conn.Write(data); err != nil { fmt.Printf("Write error: %v\n", err) return } case <-c.closeChan: return } } } func (c *Client) retryLoop() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: c.retryPendingMessages() case <-c.closeChan: return } } } func (c *Client) handlePacket(packet common.Packet) { switch p := packet.(type) { case *mqtt311.ConnectPacket: c.handleConnect(p) case *mqtt311.PublishPacket: c.handlePublish(p) case *mqtt311.SubscribePacket: c.handleSubscribe(p) case *mqtt311.UnsubscribePacket: c.handleUnsubscribe(p) case *mqtt311.PingreqPacket: c.handlePingReq() case *mqtt311.DisconnectPacket: c.handleDisconnect() } } // handleConnect, handlePublish 等其他方法省略... ``` #### 3.4 主 Broker 实现 在 `internal/broker/broker.go` 文件中实现主 Broker 的逻辑: ```go package broker import ( "fmt" "net" "sync" "time" "github.com/yangyongzhen/mqtt-broker/internal/auth" "github.com/yangyongzhen/mqtt-broker/internal/protocol/common" "github.com/yangyongzhen/mqtt-broker/internal/storage" ) type Broker struct { listener net.Listener clients map[string]*Client topicManager *TopicManager store storage.Store auth auth.Authenticator mu sync.RWMutex running bool config *Config } type Config struct { MaxConnections int MaxMessageSize int RetainedMsgLimit int SessionExpiry time.Duration MessageExpiry time.Duration } func NewBroker(store storage.Store, authenticator auth.Authenticator) *Broker { return &Broker{ clients: make(map[string]*Client), topicManager: NewTopicManager(), store: store, auth: authenticator, config: &Config{ MaxConnections: 10000, MaxMessageSize: 1024 * 1024, RetainedMsgLimit: 10000, SessionExpiry: 24 * time.Hour, MessageExpiry: 24 * time.Hour, }, } } func (b *Broker) Start(address string) error { listener, err := net.Listen("tcp", address) if err != nil { return err } b.listener = listener b.running = true fmt.Printf("MQTT Broker started on %s\n", address) for b.running { conn, err := listener.Accept() if err != nil { if b.running { fmt.Printf("Accept error: %v\n", err) } continue } client := NewClient(conn, b) go client.Start() } return nil } func (b *Broker) Stop() { b.running = false if b.listener != nil { b.listener.Close() } b.mu.Lock() defer b.mu.Unlock() for _, client := range b.clients { client.Close() } } // AddClient, RemoveClient, PublishMessage 等其他方法省略... ``` #### 3.5 主程序入口 在 `cmd/broker/main.go` 文件中定义主程序入口: ```go package main import ( "flag" "fmt" "log" "os" "os/signal" "syscall" "github.com/yangyongzhen/mqtt-broker/internal/auth" "github.com/yangyongzhen/mqtt-broker/internal/broker" "github.com/yangyongzhen/mqtt-broker/internal/storage/memory" ) func main() { addr := flag.String("addr", ":1883", "MQTT broker address") flag.Parse() authenticator := auth.NewSimpleAuthenticator() // 示例认证器,需要自行实现 store := memory.NewMemoryStore() b := broker.NewBroker(store, authenticator) go func() { if err := b.Start(*addr); err != nil { log.Fatalf("Failed to start MQTT broker: %v", err) } }() sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan b.Stop() fmt.Println("MQTT broker stopped") } ``` 以上代码是实现一个简单的 MQTT Broker 的基础框架,更多详细功能和性能优化可以根据实际需求进行扩展和改进。 ### 安装和运行 1. **克隆项目** ```bash git clone <your-repo-url> cd mqtt-broker go mod tidy ``` ### 安装依赖 ```bash go mod tidy ``` 构建项目 ```bash make build ``` #### 或者 ```bash go build -o bin/mqtt-broker cmd/broker/main.go 运行 Broker ``` ```bash make run ``` #### 或者 ```bash ./bin/mqtt-broker -addr=:1883 -debug ``` 使用 Docker ##### 构建镜像 ```bash docker build -t mqtt-broker . #### 运行容器 docker run -p 1883:1883 mqtt-broker #### 使用示例 **启动 Broker** #### 默认端口 1883 go run cmd/broker/main.go ##### 自定义端口和调试模式 go run cmd/broker/main.go -addr=:1883 -debug ``` ### 测试客户端 项目包含一个简单的测试客户端,可以用来测试 broker 功能: 订阅消息: ```bash go run cmd/test-client/main.go -mode=sub -topic=test/hello -client=subscriber1 ``` 发布消息: ```bash go run cmd/test-client/main.go -mode=pub -topic=test/hello -msg="Hello MQTT!" -client=publisher1 ``` ### 使用第三方客户端 你也可以使用任何标准的 MQTT 客户端连接到 broker: 使用 mosquitto 客户端: #### 订阅 ```bash mosquitto_sub -h localhost -p 1883 -t "test/topic" ``` #### 发布 ```bash mosquitto_pub -h localhost -p 1883 -t "test/topic" -m "Hello World" ``` 使用认证: #### 默认用户: admin/password, test/test123 mosquitto_pub -h localhost -p 1883 -u admin -P password -t "test/topic" -m "Authenticated message" 配置说明 命令行参数 参数 默认值 说明 -addr :1883 Broker 监听地址 -debug false 启用调试日志 #### 内置用户 Broker 默认创建了以下测试用户: 用户名 密码 admin password test test123 项目开源地址: [https://github.com/yangyongzhen/goang-mqtt-broker](https://github.com/yangyongzhen/goang-mqtt-broker) gitee: [https://gitee.com/yyz116/goang-mqtt-broker](https://gitee.com/yyz116/goang-mqtt-broker) ### 作者 作者csdn猫哥,转载请注明出处: [https://blog.csdn.net/yyz_1987](https://blog.csdn.net/yyz_1987)

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
342 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏