分享
  1. 首页
  2. 文章

一个TCP长连接设备管理后台工程(五)

qiuzhiqian · · 1046 次点击 · · 开始浏览
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

前篇

一个TCP长连接设备管理后台工程(一)
一个TCP长连接设备管理后台工程(二)
一个TCP长连接设备管理后台工程(三)
一个TCP长连接设备管理后台工程(四)

Github仓库地址

帧过滤器

帧过滤器的作用就是,从接收到的buff中,过滤出有效的完整jtt808数据包。由于是tcp通讯,那么这其中不可避免的会涉及到数据包的两个常规处理:拆包和粘包。

拆包和粘包的简要说明:

假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。
(1)服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;
(2)服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包;
(3)服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包;
(4)服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包。
如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包。

拆包与粘包的说明网上资料很多,此处不做过多说明。但是我们设计出来的过滤器,要能够正常应对拆包和粘包的情况。

首先,我们根据jtt808协议定义我们的数据包结构:

type MultiField struct {
 MsgSum uint16
 MsgIndex uint16
}
type Header struct {
 MID uint16
 Attr uint16
 Version uint8
 PhoneNum string
 SeqNum uint16
 MutilFlag MultiField
}
type Message struct {
 HEADER Header
 BODY []byte
}

由于Attr其实是由多个位域字段组成,所以我们再定义三个函数:

func (h *Header) IsMulti() bool {
 if ((h.Attr >> 12) & 0x0001) > 0 {
 return true
 }
 return false
}
//BodyLen is a function for get body len
func (h *Header) BodyLen() int {
 return int(h.Attr & 0x03ff)
}
//MakeAttr is generate attr
func MakeAttr(verFlag byte, mut bool, enc byte, lens uint16) uint16 {
 attr := lens & 0x03FF
 if verFlag > 0 {
 attr = attr & 0x4000
 }
 if mut {
 attr = attr & 0x2000
 }
 encMask := (uint16(enc) & 0x0007) << 10
 return attr + encMask
}

由于要考虑拆包和粘包问题,所以我们的过滤器需要能够同时分析多包数据,但是基本单元函数分析一帧数据,所以我们先实现一帧数据的过滤器:filterSigle

我们想要的过滤器原型是如下的一个函数:

func filterSigle(data []byte) (Message, int, error)

该函数接收一个存有从tcp端接收的数据流切片,需要返回我们解析出来的Message、解析过后消耗的字节数,错误信息。

很明显,这个返回的消耗字节数就是为了应对拆包和粘包用的。

我们知道jtt808协议是以0x7e开始和结尾的,我们定义一个常量:

const (
 ProtoHeader byte = 0x7e
)

filterSigle的第一个逻辑就是要识别帧头和帧尾了:

var usedLen int = 0
startindex := bytes.IndexByte(data, ProtoHeader)
if startindex >= 0 {
 usedLen = startindex + 1
 endindex := bytes.IndexByte(data[usedLen:], ProtoHeader)
 if endindex >= 0 {
 endindex = endindex + usedLen
 }
}

此处的帧头和帧尾索引均相对于data的起始字节而言。理想的情况下,在startindex和endindex之间的数据就是我们需要解析的数据,所以之后的解析都是对这部分数据进行分析。我们对这部分的逻辑单独用一个函数来处理,这个函数主要完成三个逻辑:转义、校验检查和解析

func frameParser(data []byte) (Message, error) {
}

data入参从startindex到endindex,不包括startindex和endindex。

在转义之前,首先判断基本的长度。通过对帧头固定字段分析可以知道,消息头部分的长度为17或者19个字节,加上帧头帧尾校验位的话最小就是17+3=20个字节,鉴于BODY部分可能为空,所以整个帧最小长度应该为20字节:

if len(data)+2 < 17+3 {
 return Message{}, fmt.Errorf("header is too short")
}

转义比较简单,为了代码复用,定义成一个函数:

func Escape(data, oldBytes, newBytes []byte) []byte {
 buff := make([]byte, 0)
 var startindex int = 0
 for startindex < len(data) {
 index := bytes.Index(data[startindex:], oldBytes)
 if index >= 0 {
 buff = append(buff, data[startindex:index]...)
 buff = append(buff, newBytes...)
 startindex = index + len(oldBytes)
 } else {
 buff = append(buff, data[startindex:]...)
 startindex = len(data)
 }
 }
 return buff
}

调用:

//不包含帧头帧尾
frameData := Escape(data[:len(data)], []byte{0x7d, 0x02}, []byte{0x7e})
frameData = Escape(frameData, []byte{0x7d, 0x01}, []byte{0x7d})

校验就是简单的异或校验,从消息头到消息体结束,即data[:len(data)-1]

func checkSum(data []byte) byte {
 var sum byte = 0
 for _, itemdata := range data {
 sum ^= itemdata
 }
 return sum
}

调用:

rawcs := checkSum(frameData[:len(frameData)-1])
if rawcs != frameData[len(frameData)-1] {
 return Message{}, fmt.Errorf("cs is not match:%d--%d", rawcs, frameData[len(frameData)-1])
}

然后就是对frameData中的具体数据进行解析了:

var usedLen int = 0
var msg Message
msg.HEADER.MID = codec.Bytes2Word(frameData[usedLen:])
usedLen = usedLen + 2
msg.HEADER.Attr = codec.Bytes2Word(frameData[usedLen:])
usedLen = usedLen + 2
msg.HEADER.Version = frameData[usedLen]
usedLen = usedLen + 1

注意usedLen要跟着实时变化。

手机号固定为10个字节,不足的话前面会填充0,所以我们要把前面无效的0去掉,使用bytes.TrimLeftFunc:

tempPhone := bytes.TrimLeftFunc(frameData[usedLen:usedLen+10], func(r rune) bool { return r == 0x00 })
msg.HEADER.PhoneNum = string(tempPhone)
usedLen = usedLen + 10
msg.HEADER.SeqNum = codec.Bytes2Word(frameData[usedLen:])
usedLen = usedLen + 2

同时还要对多帧的情况进行判断:

if msg.HEADER.IsMulti() {
 msg.HEADER.MutilFlag.MsgSum = codec.Bytes2Word(frameData[usedLen:])
 usedLen = usedLen + 2
 msg.HEADER.MutilFlag.MsgIndex = codec.Bytes2Word(frameData[usedLen:])
 usedLen = usedLen + 2
}

再次对usedLen长度判断一下,避免超过界限:

if len(frameData) < usedLen {
 return Message{}, fmt.Errorf("flag code is too short")
}

处理到上面的地方后,接着的就是BODY部分了,直接copy对应的长度,长度为:

len(frameData)-usedLen

逻辑如下

msg.BODY = make([]byte, len(frameData)-usedLen)
copy(msg.BODY, frameData[usedLen:len(frameData)])
usedLen = len(frameData)
return msg, nil

到此正常的流程就走完了。

调用:

msg, err := frameParser(frameData)

当返回错误时,返回的长度值应该为endindex,即不包括endindex处对应的0x7e。因为这个0x7e可能是后面数据的帧头。

msg, err := frameParser(data[startindex+1 : endindex])
if err != nil {
 return Message{}, endindex, err
}
return msg, endindex + 1, nil

对于

if endindex >= 0

条件不符合的,说明没有找到帧尾,可以包帧头前面的去掉了,但是帧头和帧头后面的数据要保留,用来跟之后的数据流拼接。

return Message{}, startindex, fmt.Errorf("can't find end flag")

对于

if startindex >= 0

条件不符合的,说明没有找到帧头,那就是整个帧都是无效的:

return Message{}, len(data), fmt.Errorf("can't find start flag")

这样就实现了一个单帧的过滤器。接着我们在单帧过滤器的基础上来实现多帧过滤器。

我们只需要对数据流进行单帧过滤,然后返回消耗的字节数。如果消耗了一定字节数后,还有剩余的字节,我们再对这些字节进行单帧过滤。依次循环,直到字节数消耗完或者发生错误。

所有循环结束后,我们还需要将剩余的字节数保留,用来跟下一次的数据流进行拼接。函数实现如下:

//Filter is proto Filter api
func Filter(data []byte) ([]Message, int, error) {
 var usedLen int = 0
 msgList := make([]Message, 0)
 var cnt int = 0
 for {
 cnt++
 if cnt > 10 {
 return []Message{}, 0, fmt.Errorf("time too much")
 }
 if usedLen >= len(data) {
 break
 }
 msg, lens, err := filterSigle(data[usedLen:])
 if err != nil {
 usedLen = usedLen + lens
 fmt.Println("err:", err)
 return msgList, usedLen, nil
 }
 usedLen = usedLen + lens
 msgList = append(msgList, msg)
 }
 return msgList, usedLen, nil
}

整个过滤器完整实现:

package proto
import (
 "bytes"
 "fmt"
 "tsp/codec"
 "tsp/utils"
)
const (
 ProtoHeader byte = 0x7e
)
type MultiField struct {
 MsgSum uint16
 MsgIndex uint16
}
type Header struct {
 MID uint16
 Attr uint16
 Version uint8
 PhoneNum string
 SeqNum uint16
 MutilFlag MultiField
}
func (h *Header) IsMulti() bool {
 if ((h.Attr >> 12) & 0x0001) > 0 {
 return true
 }
 return false
}
//BodyLen is a function for get body len
func (h *Header) BodyLen() int {
 return int(h.Attr & 0x03ff)
}
//MakeAttr is generate attr
func MakeAttr(verFlag byte, mut bool, enc byte, lens uint16) uint16 {
 attr := lens & 0x03FF
 if verFlag > 0 {
 attr = attr & 0x4000
 }
 if mut {
 attr = attr & 0x2000
 }
 encMask := (uint16(enc) & 0x0007) << 10
 return attr + encMask
}
//Message is struct for message for jtt808
type Message struct {
 HEADER Header
 BODY []byte
}
func Version() string {
 return "1.0.0"
}
func Name() string {
 return "jtt808"
}
//Filter is proto Filter api
func Filter(data []byte) ([]Message, int, error) {
 var usedLen int = 0
 msgList := make([]Message, 0)
 var cnt int = 0
 for {
 //添加一个计数器,防止数据异常导致死循环
 cnt++
 if cnt > 10 {
 cnt = 0
 return []Message{}, 0, fmt.Errorf("time too much")
 }
 if usedLen >= len(data) {
 break
 }
 msg, lens, err := filterSigle(data[usedLen:])
 if err != nil {
 usedLen = usedLen + lens
 fmt.Println("err:", err)
 return msgList, usedLen, nil
 }
 usedLen = usedLen + lens
 msgList = append(msgList, msg)
 }
 return msgList, usedLen, nil
}
func filterSigle(data []byte) (Message, int, error) {
 var usedLen int = 0
 startindex := bytes.IndexByte(data, ProtoHeader)
 if startindex >= 0 {
 usedLen = startindex + 1
 endindex := bytes.IndexByte(data[usedLen:], ProtoHeader)
 if endindex >= 0 {
 endindex = endindex + usedLen
 msg, err := frameParser(data[startindex+1 : endindex])
 if err != nil {
 return Message{}, endindex, err
 }
 return msg, endindex + 1, nil
 }
 return Message{}, startindex, fmt.Errorf("can't find end flag")
 }
 return Message{}, len(data), fmt.Errorf("can't find start flag")
}
func Escape(data, oldBytes, newBytes []byte) []byte {
 buff := make([]byte, 0)
 var startindex int = 0
 for startindex < len(data) {
 index := bytes.Index(data[startindex:], oldBytes)
 if index >= 0 {
 buff = append(buff, data[startindex:index]...)
 buff = append(buff, newBytes...)
 startindex = index + len(oldBytes)
 } else {
 buff = append(buff, data[startindex:]...)
 startindex = len(data)
 }
 }
 return buff
}
func frameParser(data []byte) (Message, error) {
 if len(data)+2 < 17+3 {
 return Message{}, fmt.Errorf("header is too short")
 }
 //不包含帧头帧尾
 frameData := Escape(data[:len(data)], []byte{0x7d, 0x02}, []byte{0x7e})
 frameData = Escape(frameData, []byte{0x7d, 0x01}, []byte{0x7d})
 //之后的操作都是基于frameData来处理
 rawcs := checkSum(frameData[:len(frameData)-1])
 if rawcs != frameData[len(frameData)-1] {
 return Message{}, fmt.Errorf("cs is not match:%d--%d", rawcs, frameData[len(frameData)-1])
 }
 var usedLen int = 0
 var msg Message
 msg.HEADER.MID = codec.Bytes2Word(frameData[usedLen:])
 usedLen = usedLen + 2
 msg.HEADER.Attr = codec.Bytes2Word(frameData[usedLen:])
 usedLen = usedLen + 2
 msg.HEADER.Version = frameData[usedLen]
 usedLen = usedLen + 1
 tempPhone := bytes.TrimLeftFunc(frameData[usedLen:usedLen+10], func(r rune) bool { return r == 0x00 })
 msg.HEADER.PhoneNum = string(tempPhone)
 usedLen = usedLen + 10
 msg.HEADER.SeqNum = codec.Bytes2Word(frameData[usedLen:])
 usedLen = usedLen + 2
 if msg.HEADER.IsMulti() {
 msg.HEADER.MutilFlag.MsgSum = codec.Bytes2Word(frameData[usedLen:])
 usedLen = usedLen + 2
 msg.HEADER.MutilFlag.MsgIndex = codec.Bytes2Word(frameData[usedLen:])
 usedLen = usedLen + 2
 }
 if len(frameData) < usedLen {
 return Message{}, fmt.Errorf("flag code is too short")
 }
 msg.BODY = make([]byte, len(frameData)-usedLen)
 copy(msg.BODY, frameData[usedLen:len(frameData)])
 usedLen = len(frameData)
 return msg, nil
}

有疑问加站长微信联系(非本文作者)

本文来自:Segmentfault

感谢作者:qiuzhiqian

查看原文:一个TCP长连接设备管理后台工程(五)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

关注微信
1046 次点击 ∙ 2 赞
暂无回复
添加一条新回复 (您需要 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传

用户登录

没有账号?注册
(追記) (追記ここまで)

今日阅读排行

    加载中
(追記) (追記ここまで)

一周阅读排行

    加载中

关注我

  • 扫码关注领全套学习资料 关注微信公众号
  • 加入 QQ 群:
    • 192706294(已满)
    • 731990104(已满)
    • 798786647(已满)
    • 729884609(已满)
    • 977810755(已满)
    • 815126783(已满)
    • 812540095(已满)
    • 1006366459(已满)
    • 692541889

  • 关注微信公众号
  • 加入微信群:liuxiaoyan-s,备注入群
  • 也欢迎加入知识星球 Go粉丝们(免费)

给该专栏投稿 写篇新文章

每篇文章有总共有 5 次投稿机会

收入到我管理的专栏 新建专栏