diff --git a/common/index.go b/common/index.go index 99e6445..5d07844 100644 --- a/common/index.go +++ b/common/index.go @@ -52,7 +52,9 @@ func (bt *Base) ComputeBPS(bytes int) { bt.ts = time.Now() } } - +func (bt *Base) Drop(count int) { + bt.drops += count +} func (bt *Base) GetBase() *Base { return bt } diff --git a/memory-ts.go b/memory-ts.go index 43578d6..68c92e1 100644 --- a/memory-ts.go +++ b/memory-ts.go @@ -12,7 +12,6 @@ import ( ) type MemoryTs struct { - util.BytesPool PMT util.Buffer util.BLL } @@ -33,7 +32,7 @@ func (ts *MemoryTs) WritePESPacket(frame *mpegts.MpegtsPESFrame, packet mpegts.M err = errors.New("packetStartCodePrefix != 0x000001") return } - pesHeadItem := ts.Get(32) + pesHeadItem := util.GetBLI(32) pesHeadItem.Value.Reset() _, err = mpegts.WritePESHeader(&pesHeadItem.Value, packet.Header) if err != nil { @@ -42,7 +41,7 @@ func (ts *MemoryTs) WritePESPacket(frame *mpegts.MpegtsPESFrame, packet mpegts.M pesBuffers := append(net.Buffers{pesHeadItem.Value}, packet.Buffers...) defer pesHeadItem.Recycle() pesPktLength := util.SizeOfBuffers(pesBuffers) - buffer := ts.Get((pesPktLength/mpegts.TS_PACKET_SIZE+1)*6 + pesPktLength) + buffer := util.GetBLI((pesPktLength/mpegts.TS_PACKET_SIZE+1)*6 + pesPktLength) bwTsHeader := &buffer.Value bigLen := bwTsHeader.Len() bwTsHeader.Reset() @@ -50,7 +49,7 @@ func (ts *MemoryTs) WritePESPacket(frame *mpegts.MpegtsPESFrame, packet mpegts.M var tsHeaderLength int for i := 0; len(pesBuffers)> 0; i++ { if bigLen < mpegts.TS_PACKET_SIZE { - headerItem := ts.Get(mpegts.TS_PACKET_SIZE) + headerItem := util.GetBLI(mpegts.TS_PACKET_SIZE) ts.BLL.Push(headerItem) bwTsHeader = &headerItem.Value bwTsHeader.Reset() diff --git a/publisher-ps.go b/publisher-ps.go index bc96e79..23f557b 100644 --- a/publisher-ps.go +++ b/publisher-ps.go @@ -23,7 +23,6 @@ type PSPublisher struct { // *mpegps.PSDemuxer `json:"-" yaml:"-"` mpegps.DecPSPackage `json:"-" yaml:"-"` reorder util.RTPReorder[*cacheItem] - pool util.BytesPool lastSeq uint16 } @@ -32,19 +31,18 @@ func (p *PSPublisher) PushPS(rtp *rtp.Packet) { if p.Stream == nil { return } - if p.pool == nil { + if p.EsHandler == nil { // p.PSDemuxer = mpegps.NewPSDemuxer() // p.PSDemuxer.OnPacket = p.OnPacket // p.PSDemuxer.OnFrame = p.OnFrame p.EsHandler = p p.lastSeq = rtp.SequenceNumber - 1 - p.pool = make(util.BytesPool, 17) } if p.DisableReorder { p.Feed(rtp.Payload) p.lastSeq = rtp.SequenceNumber } else { - item := p.pool.Get(len(rtp.Payload)) + item := util.GetBLI(len(rtp.Payload)) copy(item.Value, rtp.Payload) for rtpPacket := p.reorder.Push(rtp.SequenceNumber, &cacheItem{rtp.SequenceNumber, item}); rtpPacket != nil; rtpPacket = p.reorder.Pop() { if rtpPacket.Seq != p.lastSeq+1 { diff --git a/publisher.go b/publisher.go index b97b92f..baac44c 100644 --- a/publisher.go +++ b/publisher.go @@ -56,7 +56,7 @@ func (p *Publisher) OnEvent(event any) { } } -func (p *Publisher) WriteAVCCVideo(ts uint32, frame *util.BLL, pool util.BytesPool) { +func (p *Publisher) WriteAVCCVideo(ts uint32, frame *util.BLL) { if frame.ByteLength < 6 { return } @@ -66,7 +66,7 @@ func (p *Publisher) WriteAVCCVideo(ts uint32, frame *util.BLL, pool util.BytesPo if isExtHeader := b0 & 0b1000_0000; isExtHeader != 0 { fourCC := frame.GetUintN(1, 4) if fourCC == codec.FourCC_H265_32 { - p.VideoTrack = track.NewH265(p.Stream, pool) + p.VideoTrack = track.NewH265(p.Stream) p.VideoTrack.WriteAVCC(ts, frame) } } else { @@ -74,9 +74,9 @@ func (p *Publisher) WriteAVCCVideo(ts uint32, frame *util.BLL, pool util.BytesPo ts = 0 switch codecID := codec.VideoCodecID(b0 & 0x0F); codecID { case codec.CodecID_H264: - p.VideoTrack = track.NewH264(p.Stream, pool) + p.VideoTrack = track.NewH264(p.Stream) case codec.CodecID_H265: - p.VideoTrack = track.NewH265(p.Stream, pool) + p.VideoTrack = track.NewH265(p.Stream) default: p.Stream.Error("video codecID not support", zap.Uint8("codeId", uint8(codecID))) return @@ -91,7 +91,7 @@ func (p *Publisher) WriteAVCCVideo(ts uint32, frame *util.BLL, pool util.BytesPo } } -func (p *Publisher) WriteAVCCAudio(ts uint32, frame *util.BLL, pool util.BytesPool) { +func (p *Publisher) WriteAVCCAudio(ts uint32, frame *util.BLL) { if frame.ByteLength < 4 { return } @@ -102,7 +102,7 @@ func (p *Publisher) WriteAVCCAudio(ts uint32, frame *util.BLL, pool util.BytesPo if frame.GetByte(1) != 0 { return } - a := track.NewAAC(p.Stream, pool) + a := track.NewAAC(p.Stream) p.AudioTrack = a a.AVCCHead = []byte{frame.GetByte(0), 1} a.WriteAVCC(0, frame) @@ -112,7 +112,7 @@ func (p *Publisher) WriteAVCCAudio(ts uint32, frame *util.BLL, pool util.BytesPo if codecID == codec.CodecID_PCMU { alaw = false } - a := track.NewG711(p.Stream, alaw, pool) + a := track.NewG711(p.Stream, alaw) p.AudioTrack = a a.Audio.SampleRate = uint32(codec.SoundRate[(b0&0x0c)>>2]) if b0&0x02 == 0 { diff --git a/track/aac.go b/track/aac.go index 48e7405..3edd473 100644 --- a/track/aac.go +++ b/track/aac.go @@ -54,14 +54,14 @@ func (aac *AAC) WriteADTS(ts uint32, adts []byte) { aac.generateTimestamp(ts) frameLen := (int(adts[3]&3) << 11) | (int(adts[4]) << 3) | (int(adts[5])>> 5) for len(adts)>= frameLen { - aac.Value.AUList.Push(aac.BytesPool.GetShell(adts[7:frameLen])) + aac.Value.AUList.PushShell(adts[7:frameLen]) adts = adts[frameLen:] if len(adts) < 7 { break } frameLen = (int(adts[3]&3) << 11) | (int(adts[4]) << 3) | (int(adts[5])>> 5) } - aac.Value.ADTS = aac.BytesPool.GetShell(adts) + aac.Value.ADTS = util.GetShell(adts) aac.Flush() } @@ -76,7 +76,7 @@ func (aac *AAC) WriteRTPFrame(frame *RTPFrame) { } auHeaderLen := util.ReadBE[int](frame.Payload[:2]) //通常为16,即一个AU Header的长度 if auHeaderLen == 0 { - aac.Value.AUList.Push(aac.BytesPool.GetShell(frame.Payload[:2])) + aac.Value.AUList.PushShell(frame.Payload[:2]) aac.Flush() } else { payload := frame.Payload[2:] @@ -99,7 +99,7 @@ func (aac *AAC) WriteRTPFrame(frame *RTPFrame) { for _, dataLen := range dataLens { if len(payload) < int(dataLen) { aac.fragments = &util.BLL{} - aac.fragments.Push(aac.BytesPool.GetShell(payload)) + aac.fragments.PushShell(payload) // aac.fragments = aac.fragments[:0] // aac.Error("payload is too short 1", zap.Int("dataLen", int(dataLen)), zap.Int("len", len(payload))) return @@ -119,7 +119,7 @@ func (aac *AAC) WriteRTPFrame(frame *RTPFrame) { // aac.Error("payload is too short 2", zap.Int("dataLen", int(dataLens[0])), zap.Int("len", len(payload))) // return // } - aac.fragments.Push(aac.BytesPool.GetShell(payload)) + aac.fragments.PushShell(payload) // aac.fragments = append(aac.fragments, payload[:dataLens[0]]) return } @@ -146,7 +146,7 @@ func (aac *AAC) WriteRTPFrame(frame *RTPFrame) { // } // aac.fragments = append(aac.fragments, payload[:dataLens[0]]) - aac.fragments.Push(aac.BytesPool.GetShell(payload)) + aac.fragments.PushShell(payload) if !frame.Header.Marker { return } diff --git a/track/audio.go b/track/audio.go index bdcfa24..0d49cfd 100644 --- a/track/audio.go +++ b/track/audio.go @@ -48,7 +48,7 @@ func (av *Audio) WriteADTS(pts uint32, adts []byte) { func (av *Audio) Flush() { if av.CodecID == codec.CodecID_AAC && av.Value.ADTS == nil { - item := av.BytesPool.Get(7) + item := util.GetBLI(7) av.ToADTS(av.Value.AUList.ByteLength, item.Value) av.Value.ADTS = item } @@ -58,7 +58,7 @@ func (av *Audio) Flush() { func (av *Audio) WriteRaw(pts uint32, raw []byte) { curValue := &av.Value curValue.BytesIn += len(raw) - curValue.AUList.Push(av.BytesPool.GetShell(raw)) + curValue.AUList.PushShell(raw) av.generateTimestamp(pts) av.Flush() } @@ -70,10 +70,10 @@ func (av *Audio) WriteAVCC(ts uint32, frame *util.BLL) { } func (a *Audio) CompleteAVCC(value *AVFrame) { - value.AVCC.Push(a.BytesPool.GetShell(a.AVCCHead)) + value.AVCC.PushShell(a.AVCCHead) value.AUList.Range(func(v *util.BLL) bool { v.Range(func(v util.Buffer) bool { - value.AVCC.Push(a.BytesPool.GetShell(v)) + value.AVCC.PushShell(v) return true }) return true diff --git a/track/base.go b/track/base.go index a43b771..7ef73ff 100644 --- a/track/base.go +++ b/track/base.go @@ -1,6 +1,7 @@ package track import ( + "sync" "time" "unsafe" @@ -88,9 +89,8 @@ type Media struct { IDRingList `json:"-" yaml:"-"` //最近的关键帧位置,首屏渲染 SSRC uint32 SampleRate uint32 - BytesPool util.BytesPool `json:"-" yaml:"-"` - RtpPool util.Pool[RTPFrame] `json:"-" yaml:"-"` - SequenceHead []byte `json:"-" yaml:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config) + RtpPool *util.Pool[util.ListItem[RTPFrame]] `json:"-" yaml:"-"` + SequenceHead []byte `json:"-" yaml:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config) SequenceHeadSeq int RTPDemuxer SpesificTrack `json:"-" yaml:"-"` @@ -104,6 +104,7 @@ func (av *Media) GetRBSize() int { func (av *Media) GetRTPFromPool() (result *util.ListItem[RTPFrame]) { result = av.RtpPool.Get() + result.Pool = (*sync.Pool)(av.RtpPool) if result.Value.Packet == nil { result.Value.Packet = &rtp.Packet{} result.Value.PayloadType = av.PayloadType @@ -141,12 +142,17 @@ func (av *Media) SetStuff(stuff ...any) { av.Init(v) av.SSRC = uint32(uintptr(unsafe.Pointer(av))) av.等待上限 = config.Global.SpeedLimit + if config.Global.EnableRTP { + av.RtpPool = &util.Pool[util.ListItem[RTPFrame]]{ + New: func() any { + return &util.ListItem[RTPFrame]{} + }, + } + } case uint32: av.SampleRate = v case byte: av.PayloadType = v - case util.BytesPool: - av.BytesPool = v case SpesificTrack: av.SpesificTrack = v default: @@ -189,7 +195,7 @@ func (av *Media) WriteSequenceHead(sh []byte) { func (av *Media) AppendAuBytes(b ...[]byte) { var au util.BLL for _, bb := range b { - au.Push(av.BytesPool.GetShell(bb)) + au.Push(util.DefaultBytesPool.GetShell(bb)) } av.Value.AUList.PushValue(&au) } diff --git a/track/g711.go b/track/g711.go index 3128002..952a6fa 100644 --- a/track/g711.go +++ b/track/g711.go @@ -44,9 +44,9 @@ func (g711 *G711) WriteAVCC(ts uint32, frame *util.BLL) error { g711.Error("AVCC data too short", zap.Int("len", l)) return io.ErrShortWrite } - g711.Value.AUList.Push(g711.BytesPool.GetShell(frame.Next.Value[1:])) + g711.Value.AUList.PushShell(frame.Next.Value[1:]) frame.Range(func(v util.Buffer) bool { - g711.Value.AUList.Push(g711.BytesPool.GetShell(v)) + g711.Value.AUList.PushShell(v) return true }) g711.Audio.WriteAVCC(ts, frame) diff --git a/track/h264.go b/track/h264.go index 88bbc34..3054741 100644 --- a/track/h264.go +++ b/track/h264.go @@ -105,8 +105,9 @@ func (vt *H264) WriteAVCC(ts uint32, frame *util.BLL) (err error) { func (vt *H264) WriteRTPFrame(frame *RTPFrame) { if vt.lastSeq != vt.lastSeq2+1 && vt.lastSeq2 != 0 { + vt.Drop(int(vt.lastSeq - vt.lastSeq2)) vt.lostFlag = true - vt.Warn("lost rtp packet", zap.Uint16("lastSeq", vt.lastSeq), zap.Uint16("lastSeq2", vt.lastSeq2)) + vt.Warn("lost rtp packet", zap.Uint16("lastSeq", vt.lastSeq2), zap.Uint16("now", vt.lastSeq)) } rv := &vt.Value if naluType := frame.H264Type(); naluType < 24 { @@ -124,13 +125,14 @@ func (vt *H264) WriteRTPFrame(frame *RTPFrame) { } } case codec.NALU_FUA, codec.NALU_FUB: + // fmt.Println(frame.SequenceNumber, util.Bit1(frame.Payload[1], 0), util.Bit1(frame.Payload[1], 1), frame.Marker) if util.Bit1(frame.Payload[1], 0) { vt.WriteSliceByte(naluType.Parse(frame.Payload[1]).Or(frame.Payload[0] & 0x60)) } if rv.AUList.Pre != nil && rv.AUList.Pre.Value != nil { - rv.AUList.Pre.Value.Push(vt.BytesPool.GetShell(frame.Payload[naluType.Offset():])) + rv.AUList.Pre.Value.PushShell(frame.Payload[naluType.Offset():]) } else { - vt.Error("fu have no start") + vt.Warn("fu have no start") return } } diff --git a/track/h265.go b/track/h265.go index ac63d2b..6015901 100644 --- a/track/h265.go +++ b/track/h265.go @@ -131,6 +131,11 @@ func (vt *H265) WriteAVCC(ts uint32, frame *util.BLL) (err error) { } func (vt *H265) WriteRTPFrame(frame *RTPFrame) { + if vt.lastSeq != vt.lastSeq2+1 && vt.lastSeq2 != 0 { + vt.Drop(int(vt.lastSeq - vt.lastSeq2)) + vt.lostFlag = true + vt.Warn("lost rtp packet", zap.Uint16("lastSeq", vt.lastSeq2), zap.Uint16("now", vt.lastSeq)) + } rv := &vt.Value // TODO: DONL may need to be parsed if `sprop-max-don-diff` is greater than 0 on the RTP stream. var usingDonlField bool @@ -165,7 +170,7 @@ func (vt *H265) WriteRTPFrame(frame *RTPFrame) { vt.WriteSliceByte(first3[0]&0b10000001|(naluType<<1), first3[1]) } if rv.AUList.Pre != nil { - rv.AUList.Pre.Value.Push(vt.BytesPool.GetShell(buffer)) + rv.AUList.Pre.Value.PushShell(buffer) } default: vt.WriteSliceBytes(frame.Payload) diff --git a/track/video.go b/track/video.go index f1afbdd..bf5f58c 100644 --- a/track/video.go +++ b/track/video.go @@ -209,7 +209,7 @@ func (vt *Video) SetLostFlag() { vt.lostFlag = true } func (vt *Video) CompleteAVCC(rv *AVFrame) { - mem := vt.BytesPool.Get(5) + mem := util.GetBLI(5) b := mem.Value if rv.IFrame { b[0] = 0x10 | byte(vt.CodecID) @@ -226,12 +226,12 @@ func (vt *Video) CompleteAVCC(rv *AVFrame) { // } // var tmp = 0 rv.AUList.Range(func(au *util.BLL) bool { - mem = vt.BytesPool.Get(4) + mem = util.GetBLI(4) // println(au.ByteLength) util.PutBE(mem.Value, uint32(au.ByteLength)) rv.AVCC.Push(mem) au.Range(func(slice util.Buffer) bool { - rv.AVCC.Push(vt.BytesPool.GetShell(slice)) + rv.AVCC.PushShell(slice) return true }) // tmp += 4 + au.ByteLength diff --git a/util/list.go b/util/list.go index a697bd5..f30c981 100644 --- a/util/list.go +++ b/util/list.go @@ -1,5 +1,7 @@ package util +import "sync" + // 带回收功能的泛型双向链表 type IList[T any] interface { @@ -11,7 +13,7 @@ type IList[T any] interface { type ListItem[T any] struct { Value T Next, Pre *ListItem[T] - Pool *List[T] // 回收池 + Pool *sync.Pool // 回收池 list *List[T] } @@ -49,8 +51,9 @@ func (item *ListItem[T]) IsRoot() bool { } func (item *ListItem[T]) Recycle() { - if item.list != item.Pool && item.Pool != nil { - item.Pool.Push(item) + if pool := item.Pool; pool != nil { + item.Pool = nil //防止重复回收 + pool.Put(item) } } @@ -106,13 +109,6 @@ func (p *List[T]) ShiftValue() T { return p.Shift().Value } -func (p *List[T]) PoolShift() (head *ListItem[T]) { - if head = p.Shift(); head == nil { - head = &ListItem[T]{Pool: p} - } - return -} - func (p *List[T]) Shift() (head *ListItem[T]) { if p.Length == 0 { return diff --git a/util/pool.go b/util/pool.go index 0dd7c84..ea3b5f2 100644 --- a/util/pool.go +++ b/util/pool.go @@ -3,6 +3,7 @@ package util import ( "io" "net" + "sync" ) type Recyclable interface { @@ -113,6 +114,10 @@ func (list *BLLs) PushValue(item *BLL) { list.ByteLength += item.ByteLength } +func (list *BLLs) PushShell(b []byte) { + list.Push(GetShell(b)) +} + func (list *BLLs) Push(item *ListItem[Buffer]) { if list == nil { return @@ -174,12 +179,16 @@ func (list *BLL) NewReader() *BLLReader { return &BLLReader{list.Next, 0} } -// func (list *BLL) Concat(list2 BLL) { -// list.Tail.Next = list2.Head -// list.Tail = list2.Tail -// list.Length += list2.Length -// list.ByteLength += list2.ByteLength -// } +// func (list *BLL) Concat(list2 BLL) { +// list.Tail.Next = list2.Head +// list.Tail = list2.Tail +// list.Length += list2.Length +// list.ByteLength += list2.ByteLength +// } + +func (list *BLL) PushShell(b Buffer) { + list.Push(GetShell(b)) +} func (list *BLL) Push(item *ListItem[Buffer]) { if list == nil { @@ -262,45 +271,63 @@ func (list *BLL) GetUintN(index int, n int) (result uint32) { // return // } -type BytesPool []List[Buffer] +type BytesPool []*sync.Pool + +var DefaultBytesPool = make(BytesPool, 17) + +func init() { + for i := 0; i < len(DefaultBytesPool); i++ { + pool := &sync.Pool{} + pool.New = func() interface{} { + return &ListItem[Buffer]{Value: make(Buffer, 1<= size { - if item = p[i].PoolShift(); cap(item.Value)> 0 { - item.Value = item.Value.SubBuf(0, size) - } else { - item.Value = make(Buffer, size, level) - } + mem = p[i].Get().(*ListItem[Buffer]) + mem.Pool = p[i] + mem.Value.Relloc(size) return } } // Pool 中没有就无法回收 - if item == nil { - item = &ListItem[Buffer]{ + if mem == nil { + mem = &ListItem[Buffer]{ Value: make(Buffer, size), } } return } -type Pool[T any] List[T] +type Pool[T any] sync.Pool -func (p *Pool[T]) Get() (item *ListItem[T]) { - if item = (*List[T])(p).PoolShift(); item == nil { - item = &ListItem[T]{ - Pool: (*List[T])(p), - } - } - return +func (p *Pool[T]) Get() (v *T) { + return ((*sync.Pool)(p)).Get().(*T) +} + +func (p *Pool[T]) Put(v *T) { + ((*sync.Pool)(p)).Put(v) }

AltStyle によって変換されたページ (->オリジナル) /