分享
  1. 首页
  2. 文章

golang socket连接复用 - smux

写个代码容易么 · · 1744 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

今天来介绍一个socket连接复用的包
https://github.com/xtaci/smux

如图所示,多个channel输入通过smux合并在一个连接中,后端服务将连接中的channel分离出来进行处理

smux.jpg

场景分析

假设一个简单的使用场景,一个apiservice网关服务对外提供HTTP接口,后面还有一个rand随机数服务,对内提供随机数TCP接口。

客户端访问apiservice接口,apiservice连接randservice服务获取数据并返回。如果不做多路复用的话,apiservice和randservice之间的连接数就是客户端请求数,这样apiservice和randservice之间连接过多会导致性能问题。

 n link n link
+-----------+ +-------------+ +---------------+
| <----------> <-----------> |
| client <----------> apiservice <-----------> randservice |
| <----------> <-----------> |
+-----------+ +-------------+ +---------------+

经过多路复用后,apiservice和randservice之间只有一个连接,这样无论多少个客户端请求都不会导致连接过多问题。

 n link 1 link
+-----------+ +-------------+ +---------------+
| <----------> | | |
| client <----------> apiservice <-----------> randservice |
| <----------> | | |
+-----------+ +-------------+ +---------------+

(当然这只是个示例场景而已,生产中apiservice和randservice之间使用RPC框架即可,不用我们手动写socket通信)

代码示例

1.随机数服务 randservice.go

package main
import (
 "bytes"
 "encoding/binary"
 "fmt"
 "github.com/rs/zerolog"
 "github.com/rs/zerolog/log"
 "github.com/xtaci/smux"
 "math/rand"
 "net"
 "runtime"
 "time"
)
func init() {
 rand.Seed(time.Now().UnixNano())
}
/**
一个生成随机数的tcp服务
客户端发送'R', 'A', 'N', 'D',服务返回一个随机数
*/
func main() {
 listener, err := net.Listen("tcp", ":9000")
 if err != nil {
 panic(err)
 }
 log.Info().Msg("随机数服务启动,监听9000端口")
 defer listener.Close()
 for {
 conn, err := listener.Accept()
 if err != nil {
 fmt.Println(err.Error())
 continue
 }
 go SessionHandler(conn)
 }
}
/**
处理会话
每个tcp连接生成一个会话session
*/
func SessionHandler(conn net.Conn) {
 session, err := smux.Server(conn, nil)
 if err != nil {
 panic(err)
 }
 log.Info().Msgf("收到客户端连接,创建新会话,对端地址:%s", session.RemoteAddr().String())
 for !session.IsClosed() {
 stream, err := session.AcceptStream()
 if err != nil {
 fmt.Println(err.Error())
 break
 }
 go StreamHandler(stream)
 }
 log.Info().Msgf("客户端连接断开,销毁会话,对端地址:%s", session.RemoteAddr().String())
}
/**
流数据处理
*/
func StreamHandler(stream *smux.Stream) {
 buffer := make([]byte, 1024)
 n, err := stream.Read(buffer)
 if err != nil {
 log.Error().Msgf("流id:%d,异常信息:%s", stream.ID(), err.Error())
 stream.Close()
 return
 }
 cmd := buffer[:n]
 if bytes.Equal(cmd, []byte{'R', 'A', 'N', 'D'}) {
 rand := rand.Uint64()
 response := make([]byte, 8)
 binary.BigEndian.PutUint64(response, rand)
 stream.Write(response)
 log.Debug().Msgf("收到客户端数据,流id:%d,随机数:%d, 响应数据:%v", stream.ID(), rand, response)
 } else {
 log.Warn().Msgf("收到未知请求命令,流id:%d,请求命令:%v", stream.ID(), cmd)
 }
}

2.api接口服务 apiservice.go

package main
import (
 "encoding/binary"
 "fmt"
 "github.com/rs/zerolog"
 "github.com/rs/zerolog/log"
 "github.com/xtaci/smux"
 "net"
 "net/http"
 "runtime"
)
/**
随机数服务客户端连接
*/
var randClient *smux.Session
func init() {
 //连接后端随机数服务
 conn, err := net.Dial("tcp", ":9000")
 if err != nil {
 log.Warn().Msg("随机数服务未启动")
 panic(err)
 }
 session, err := smux.Client(conn, nil)
 if err != nil {
 log.Error().Msg("打开会话失败")
 panic(err)
 }
 randClient = session
}
/**
一个api网关,对外提供api接口
调用随机数服务来获取随机数
*/
func main() {
 defer randClient.Close()
 http.HandleFunc("/rand", RandHandler)
 http.ListenAndServe(":8080", nil)
}
/**
随机数接口
*/
func RandHandler(w http.ResponseWriter, r *http.Request) {
 stream, err := randClient.OpenStream()
 if err != nil {
 w.WriteHeader(500)
 fmt.Fprint(w, err.Error())
 } else {
 log.Debug().Msgf("收到请求,打开流成功,流id:%d", stream.ID())
 defer stream.Close()
 stream.Write([]byte{'R', 'A', 'N', 'D'})
 buffer := make([]byte, 1024)
 n, err := stream.Read(buffer)
 if err != nil {
 w.WriteHeader(500)
 fmt.Fprint(w, err.Error())
 } else {
 response := buffer[:n]
 var rand = binary.BigEndian.Uint64(response)
 log.Debug().Msgf("收到服务端数据,流id:%d,随机数:%d, 响应数据:%v", stream.ID(), rand, response)
 fmt.Fprintf(w, "%d", rand)
 }
 }
}

原理分析

smux将socket连接封装成session,每次请求响应封装成一个stream,通过自定义协议发送数据

VERSION(1B) | CMD(1B) | LENGTH(2B) | STREAMID(4B) | DATA(LENGTH) 
VALUES FOR LATEST VERSION:
VERSION:
 1/2
 
CMD:
 cmdSYN(0)
 cmdFIN(1)
 cmdPSH(2)
 cmdNOP(3)
 cmdUPD(4) // only supported on version 2
 
STREAMID:
 client use odd numbers starts from 1
 server use even numbers starts from 0
 
cmdUPD:
 | CONSUMED(4B) | WINDOW(4B) |

比如我们发送的RAND命令封装成以下数据包发送给服务端,假设请求的STREAMID为11223344

VERSION(1B) | CMD(1B) | LENGTH(2B) | 11223344 | RAND
VERSION(1B) | CMD(1B) | LENGTH(2B) | 11223344 | 0102030405060708

扩展优化

但是这样又导致了另一个问题,由于apiservice和randservice之间只有一个连接,而这一个连接只能由一个goroutine处理,这样就导致性能低下
所以进一步扩展apiservice和randservice之间建立固定数量的连接,如10个连接,用来处理所有的请求,就是通过连接池的方式来性能最大化

改造后的示意图如下:

 n link 10 link
+-----------+ +-------------+ +---------------+
| <----------> <-----------> |
| client <----------> apiservice <-----------> randservice |
| <----------> <-----------> |
+-----------+ +-------------+ +---------------+

连接池版代码 apiservicewithpool.go

package main
import (
 "context"
 "encoding/binary"
 "fmt"
 cpool "github.com/jolestar/go-commons-pool/v2"
 "github.com/rs/zerolog"
 "github.com/rs/zerolog/log"
 "github.com/xtaci/smux"
 "net"
 "net/http"
 "runtime"
)
var commonPool *cpool.ObjectPool
var ctx = context.Background()
func init() {
 factory := cpool.NewPooledObjectFactorySimple(NewSessionCpool)
 commonPool = cpool.NewObjectPoolWithDefaultConfig(ctx, factory)
 commonPool.Config.MaxTotal = 10
}
/**
连接池生成新会话函数
*/
func NewSessionCpool(ctx context.Context) (interface{}, error) {
 log.Debug().Msg("连接池中生成一个连接")
 //连接后端随机数服务
 conn, err := net.Dial("tcp", ":9000")
 if err != nil {
 log.Warn().Msg("随机数服务未启动")
 panic(err)
 }
 //随机数服务客户端连接
 session, err := smux.Client(conn, nil)
 if err != nil {
 log.Error().Msg("打开会话失败")
 panic(err)
 }
 return session, err
}
/**
一个api网关,对外提供api接口
调用随机数服务来获取随机数
通过sync.Pool实现"连接池" !!! 不推荐这种方式,sync.Pool的种种特性不适合作为连接池
*/
func main() {
 http.HandleFunc("/rand", CommonPoolRandHandler)
 http.ListenAndServe(":8080", nil)
}
/**
随机数接口
*/
func CommonPoolRandHandler(w http.ResponseWriter, r *http.Request) {
 obj, err := commonPool.BorrowObject(ctx)
 if err != nil {
 w.WriteHeader(500)
 fmt.Fprint(w, err.Error())
 return
 }
 client := obj.(*smux.Session)
 stream, err := client.OpenStream()
 if err != nil {
 w.WriteHeader(500)
 fmt.Fprint(w, err.Error())
 } else {
 log.Debug().Msgf("收到请求,打开流成功,流id:%d", stream.ID())
 defer stream.Close()
 stream.Write([]byte{'R', 'A', 'N', 'D'})
 buffer := make([]byte, 1024)
 n, err := stream.Read(buffer)
 if err != nil {
 w.WriteHeader(500)
 fmt.Fprint(w, err.Error())
 } else {
 response := buffer[:n]
 var rand = binary.BigEndian.Uint64(response)
 log.Debug().Msgf("收到服务端数据,流id:%d,随机数:%d, 响应数据:%v", stream.ID(), rand, response)
 fmt.Fprintf(w, "%d", rand)
 }
 }
 commonPool.ReturnObject(ctx, obj)
}

经过连接池改造后的模型就像MySQL或Redis的使用场景,每次请求相当于一个stream,多个stream共用一个session,一个session背后有一个socket连接,程序和MySQL或Redis之间创建多个session放入连接池中,每次请求从连接池中拿出session进行读写操作


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:写个代码容易么

查看原文:golang socket连接复用 - smux

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
1744 次点击
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏