ccRTP 2.1.2: incqueue.cpp Source File

ccRTP 2.1.2
incqueue.cpp
Go to the documentation of this file.
1 // Copyright (C) 2001-2015 Federico Montesino Pouzols <fedemp@altern.org>
2 //
3 // This program is free software; you can redistribute it and/or modify
4 // it under the terms of the GNU General Public License as published by
5 // the Free Software Foundation; either version 2 of the License, or
6 // (at your option) any later version.
7 //
8 // This program is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 // GNU General Public License for more details.
12 //
13 // You should have received a copy of the GNU General Public License
14 // along with this program; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
16 //
17 // As a special exception, you may use this file as part of a free software
18 // library without restriction. Specifically, if other files instantiate
19 // templates or use macros or inline functions from this file, or you compile
20 // this file and link it with other files to produce an executable, this
21 // file does not by itself cause the resulting executable to be covered by
22 // the GNU General Public License. This exception does not however
23 // invalidate any other reasons why the executable file might be covered by
24 // the GNU General Public License.
25 //
26 // This exception applies only to the code released under the name GNU
27 // ccRTP. If you copy code from other releases into a copy of GNU
28 // ccRTP, as the General Public License permits, the exception does
29 // not apply to the code that you add in this way. To avoid misleading
30 // anyone as to the status of such modified files, you must delete
31 // this exception notice from them.
32 //
33 // If you write modifications of your own for GNU ccRTP, it is your choice
34 // whether to permit this exception to apply to your modifications.
35 // If you do not wish that, delete this exception notice.
36 //
37 
38 #include "private.h"
39 #include <ccrtp/iqueue.h>
40 
41 NAMESPACE_COMMONCPP
42 
43 const size_t IncomingDataQueueBase::defaultMaxRecvPacketSize = 65534;
44 
45 ConflictHandler::ConflictingTransportAddress::
46  ConflictingTransportAddress(InetAddress na,tpport_t dtp, tpport_t ctp):
47 networkAddress(na), dataTransportPort(dtp),
48 controlTransportPort(ctp), next(NULL)
49 {
50  SysTime::gettimeofday(&lastPacketTime,NULL);
51 }
52 
53 ConflictHandler::ConflictingTransportAddress*
54  ConflictHandler::searchDataConflict(InetAddress na, tpport_t dtp)
55 {
56  ConflictingTransportAddress* result = firstConflict;
57  while ( result->networkAddress != na ||
58  result->dataTransportPort != dtp)
59  result = result->next;
60  return result;
61 }
62 
63 ConflictHandler::ConflictingTransportAddress*
64  ConflictHandler::searchControlConflict(InetAddress na, tpport_t ctp)
65 {
66  ConflictingTransportAddress* result = firstConflict;
67  while ( result &&
68  (result->networkAddress != na ||
69  result->controlTransportPort != ctp) )
70  result = result->next;
71  return result;
72 }
73 
74 void
75  ConflictHandler::addConflict(const InetAddress& na, tpport_t dtp, tpport_t ctp)
76 {
77  ConflictingTransportAddress* nc =
78  new ConflictingTransportAddress(na,dtp,ctp);
79 
80  if ( lastConflict ) {
81  lastConflict->setNext(nc);
82  lastConflict = nc;
83  } else {
84  firstConflict = lastConflict = nc;
85  }
86 }
87 
88 const uint8 IncomingDataQueue::defaultMinValidPacketSequence = 0;
89 const uint16 IncomingDataQueue::defaultMaxPacketMisorder = 0;
90 const uint16 IncomingDataQueue::defaultMaxPacketDropout = 3000;
91 const size_t IncomingDataQueue::defaultMembersSize =
92 MembershipBookkeeping::defaultMembersHashSize;
93 
94  IncomingDataQueue::IncomingDataQueue(uint32 size) :
95 IncomingDataQueueBase(), MembershipBookkeeping(size)
96 {
97  recvFirst = recvLast = NULL;
98  sourceExpirationPeriod = 5; // 5 RTCP report intervals
99  minValidPacketSequence = getDefaultMinValidPacketSequence();
100  maxPacketDropout = getDefaultMaxPacketDropout();
101  maxPacketMisorder = getDefaultMaxPacketMisorder();
102 }
103 
104 void
105  IncomingDataQueue::purgeIncomingQueue()
106 {
107  IncomingRTPPktLink* recvnext;
108  // flush the reception queue (incoming packets not yet
109  // retrieved)
110  recvLock.writeLock();
111  while( recvFirst )
112  {
113  recvnext = recvFirst->getNext();
114 
115  // nullify source specific packet list
116  SyncSourceLink *s = recvFirst->getSourceLink();
117  s->setFirst(NULL);
118  s->setLast(NULL);
119 
120  delete recvFirst->getPacket();
121  delete recvFirst;
122  recvFirst = recvnext;
123  }
124  recvLock.unlock();
125 }
126 
127 void
128  IncomingDataQueue::renewLocalSSRC()
129 {
130  const uint32 MAXTRIES = 20;
131  uint32 newssrc;
132  uint16 tries = 0;
133  do {
134  newssrc = random32();
135  tries++;
136  } while ( (tries < MAXTRIES) && isRegistered(newssrc) );
137 
138  if ( MAXTRIES == tries ) {
139  // TODO we are in real trouble.
140  }
141 }
142 
143 bool
144  IncomingDataQueue::isWaiting(const SyncSource* src) const
145 {
146  bool w;
147  recvLock.readLock();
148  if ( NULL == src )
149  w = ( NULL != recvFirst);
150  else
151  w = isMine(*src) && ( NULL != getLink(*src)->getFirst() );
152 
153  recvLock.unlock();
154  return w;
155 }
156 
157 uint32
158  IncomingDataQueue::getFirstTimestamp(const SyncSource* src) const
159 {
160  recvLock.readLock();
161 
162  // get the first packet
163  IncomingRTPPktLink* packetLink;
164  if ( NULL == src )
165  packetLink = recvFirst;
166  else
167  packetLink = isMine(*src) ? getLink(*src)->getFirst() : NULL;
168 
169  // get the timestamp of the first packet
170  uint32 ts;
171  if ( packetLink )
172  ts = packetLink->getTimestamp();
173  else
174  ts = 0l;
175 
176  recvLock.unlock();
177  return ts;
178 }
179 
180 size_t
181  IncomingDataQueue::takeInDataPacket(void)
182 {
183  InetHostAddress network_address;
184  tpport_t transport_port;
185 
186  uint32 nextSize = (uint32)getNextDataPacketSize();
187  unsigned char* buffer = new unsigned char[nextSize];
188  int32 rtn = (int32)recvData(buffer,nextSize,network_address,transport_port);
189  if ( (rtn < 0) || ((uint32)rtn > getMaxRecvPacketSize()) ){
190  delete [] buffer;
191  return 0;
192  }
193 
194  // get time of arrival
195  struct timeval recvtime;
196  SysTime::gettimeofday(&recvtime,NULL);
197 
198  // Special handling of padding to take care of encrypted content.
199  // In case of SRTP the padding length field is also encrypted, thus
200  // it gives a wrong length. Check and clear padding bit before
201  // creating the RTPPacket. Will be set and re-computed after a possible
202  // SRTP decryption.
203  uint8 padSet = (*buffer & 0x20);
204  if (padSet) {
205  *buffer = *buffer & ~0x20; // clear padding bit
206  }
207  // build a packet. It will link itself to its source
208  IncomingRTPPkt* packet =
209  new IncomingRTPPkt(buffer,rtn);
210 
211  // Generic header validity check.
212  if ( !packet->isHeaderValid() ) {
213  delete packet;
214  return 0;
215  }
216 
217  CryptoContext* pcc = getInQueueCryptoContext( packet->getSSRC());
218  if (pcc == NULL) {
219  pcc = getInQueueCryptoContext(0);
220  if (pcc != NULL) {
221  pcc = pcc->newCryptoContextForSSRC(packet->getSSRC(), 0, 0L);
222  if (pcc != NULL) {
223  pcc->deriveSrtpKeys(0);
224  setInQueueCryptoContext(pcc);
225  }
226  }
227  }
228  if (pcc != NULL) {
229  int32 ret = packet->unprotect(pcc);
230  if (ret < 0) {
231  if (!onSRTPPacketError(*packet, ret)) {
232  delete packet;
233  return 0;
234  }
235  }
236  }
237  if (padSet) {
238  packet->reComputePayLength(true);
239  }
240  // virtual for profile-specific validation and processing.
241  if ( !onRTPPacketRecv(*packet) ) {
242  delete packet;
243  return 0;
244  }
245 
246  bool source_created;
247  SyncSourceLink* sourceLink =
248  getSourceBySSRC(packet->getSSRC(),source_created);
249  SyncSource* s = sourceLink->getSource();
250  if ( source_created ) {
251  // Set data transport address.
252  setDataTransportPort(*s,transport_port);
253  // Network address is assumed to be the same as the control one
254  setNetworkAddress(*s,network_address);
255  sourceLink->initStats();
256  // Keep first sequence number for stats
257  sourceLink->setBaseSeqNum(packet->getSeqNum());
258  // First packet arrival time.
259  sourceLink->setInitialDataTime(recvtime);
260  sourceLink->setProbation(getMinValidPacketSequence());
261  if ( sourceLink->getHello() )
262  onNewSyncSource(*s);
263  } else if ( 0 == s->getDataTransportPort() ) {
264  // Test if RTCP packets had been received but this is the
265  // first data packet from this source.
266  setDataTransportPort(*s,transport_port);
267  }
268 
269  // Before inserting in the queue,
270  // 1) check for collisions and loops. If the packet cannot be
271  // assigned to a source, it will be rejected.
272  // 2) check the source is a sufficiently well known source
273  // TODO: also check CSRC identifiers.
274  if ( checkSSRCInIncomingRTPPkt(*sourceLink,source_created,
275  network_address,transport_port) &&
276  recordReception(*sourceLink,*packet,recvtime) ) {
277  // now the packet link is linked in the queues
278  IncomingRTPPktLink* packetLink =
279  new IncomingRTPPktLink(packet,
280  sourceLink,
281  recvtime,
282  packet->getTimestamp() -
283  sourceLink->getInitialDataTimestamp(),
284  NULL,NULL,NULL,NULL);
285  insertRecvPacket(packetLink);
286  } else {
287  // must be discarded due to collision or loop or
288  // invalid source
289  delete packet;
290  }
291 
292  // ccRTP keeps packets from the new source, but avoids
293  // flip-flopping. This allows losing less packets and for
294  // mobile telephony applications or other apps that may change
295  // the source transport address during the session.
296  return rtn;
297 }
298 
299  bool IncomingDataQueue::checkSSRCInIncomingRTPPkt(SyncSourceLink& sourceLink,
300 bool is_new, InetAddress& network_address, tpport_t transport_port)
301 {
302  bool result = true;
303 
304  // Test if the source is new and it is not the local one.
305  if ( is_new &&
306  sourceLink.getSource()->getID() != getLocalSSRC() )
307  return result;
308 
309  SyncSource *s = sourceLink.getSource();
310 
311  if ( s->getDataTransportPort() != transport_port ||
312  s->getNetworkAddress() != network_address ) {
313  // SSRC collision or a loop has happened
314  if ( s->getID() != getLocalSSRC() ) {
315  // TODO: Optional error counter.
316 
317  // Note this differs from the default in the RFC.
318  // Discard packet only when the collision is
319  // repeating (to avoid flip-flopping)
320  if ( sourceLink.getPrevConflict() &&
321  (
322  (network_address ==
323  sourceLink.getPrevConflict()->networkAddress)
324  &&
325  (transport_port ==
326  sourceLink.getPrevConflict()->dataTransportPort)
327  ) ) {
328  // discard packet and do not flip-flop
329  result = false;
330  } else {
331  // Record who has collided so that in
332  // the future we can how if the
333  // collision repeats.
334  sourceLink.setPrevConflict(network_address,
335  transport_port,0);
336  // Change sync source transport address
337  setDataTransportPort(*s,transport_port);
338  setNetworkAddress(*s,network_address);
339  }
340 
341  } else {
342  // Collision or loop of own packets.
343  ConflictingTransportAddress* conflicting =
344  searchDataConflict(network_address,
345  transport_port);
346  if ( conflicting ) {
347  // Optional error counter.
348  updateConflict(*conflicting);
349  result = false;
350  } else {
351  // New collision
352  addConflict(s->getNetworkAddress(),
353  s->getDataTransportPort(),
354  s->getControlTransportPort());
355  dispatchBYE("SSRC collision detected when receiving data packet.");
356  renewLocalSSRC();
357  setNetworkAddress(*s,network_address);
358  setDataTransportPort(*s,transport_port);
359  setControlTransportPort(*s,0);
360  sourceLink.initStats();
361  sourceLink.setProbation(getMinValidPacketSequence());
362  }
363  }
364  }
365  return result;
366 }
367 
368 bool
369  IncomingDataQueue::insertRecvPacket(IncomingRTPPktLink* packetLink)
370 {
371  SyncSourceLink *srcLink = packetLink->getSourceLink();
372  unsigned short seq = packetLink->getPacket()->getSeqNum();
373  recvLock.writeLock();
374  IncomingRTPPktLink* plink = srcLink->getLast();
375  if ( plink && (seq < plink->getPacket()->getSeqNum()) ) {
376  // a disordered packet, so look for its place
377  while ( plink && (seq < plink->getPacket()->getSeqNum()) ){
378  // the packet is a duplicate
379  if ( seq == plink->getPacket()->getSeqNum() ) {
380  recvLock.unlock();
381  VDL(("Duplicated disordered packet: seqnum %d, SSRC:",
382  seq,srcLink->getSource()->getID()));
383  delete packetLink->getPacket();
384  delete packetLink;
385  return false;
386  }
387  plink = plink->getSrcPrev();
388  }
389  if ( !plink ) {
390  // we have scanned the whole (and non empty)
391  // list, so this must be the older (first)
392  // packet from this source.
393 
394  // insert into the source specific queue
395  IncomingRTPPktLink* srcFirst = srcLink->getFirst();
396  srcFirst->setSrcPrev(packetLink);
397  packetLink->setSrcNext(srcFirst);
398  // insert into the global queue
399  IncomingRTPPktLink* prevFirst = srcFirst->getPrev();
400  if ( prevFirst ){
401  prevFirst->setNext(packetLink);
402  packetLink->setPrev(prevFirst);
403  }
404  srcFirst->setPrev(packetLink);
405  packetLink->setNext(srcFirst);
406  srcLink->setFirst(packetLink);
407  } else {
408  // (we are in the middle of the source list)
409  // insert into the source specific queue
410  plink->getSrcNext()->setSrcPrev(packetLink);
411  packetLink->setSrcNext(plink->getSrcNext());
412  // -- insert into the global queue, with the
413  // minimum priority compared to packets from
414  // other sources
415  plink->getSrcNext()->getPrev()->setNext(packetLink);
416  packetLink->setPrev(plink->getSrcNext()->getPrev());
417  plink->getSrcNext()->setPrev(packetLink);
418  packetLink->setNext(plink->getSrcNext());
419  // ------
420  plink->setSrcNext(packetLink);
421  packetLink->setSrcPrev(plink);
422  // insert into the global queue (giving
423  // priority compared to packets from other sources)
424  //list->getNext->setPrev(packetLink);
425  //packetLink->setNext(list->getNext);
426  //list->setNext(packet);
427  //packet->setPrev(list);
428  }
429  } else {
430  // An ordered packet
431  if ( !plink ) {
432  // the only packet in the source specific queue
433  srcLink->setLast(packetLink);
434  srcLink->setFirst(packetLink);
435  // the last packet in the global queue
436  if ( recvLast ) {
437  recvLast->setNext(packetLink);
438  packetLink->setPrev(recvLast);
439  }
440  recvLast = packetLink;
441  if ( !recvFirst )
442  recvFirst = packetLink;
443  } else {
444  // there are already more packets from this source.
445  // this ignores duplicate packets
446  if ( plink && (seq == plink->getPacket()->getSeqNum()) ) {
447  VDL(("Duplicated packet: seqnum %d, SSRC:",
448  seq,srcLink->getSource->getID()));
449  recvLock.unlock();
450  delete packetLink->getPacket();
451  delete packetLink;
452  return false;
453  }
454  // the last packet in the source specific queue
455  srcLink->getLast()->setSrcNext(packetLink);
456  packetLink->setSrcPrev(srcLink->getLast());
457  srcLink->setLast(packetLink);
458  // the last packet in the global queue
459  recvLast->setNext(packetLink);
460  packetLink->setPrev(recvLast);
461  recvLast = packetLink;
462  }
463  }
464  // account the insertion of this packet into the queue
465  srcLink->recordInsertion(*packetLink);
466  recvLock.unlock();
467  // packet successfully inserted
468  return true;
469 }
470 
471 const AppDataUnit*
472  IncomingDataQueue::getData(uint32 stamp, const SyncSource* src)
473 {
474  IncomingRTPPktLink* pl;
475 // unsigned count = 0;
476  AppDataUnit* result;
477 
478  if ( NULL != (pl = getWaiting(stamp,src)) ) {
479  IncomingRTPPkt* packet = pl->getPacket();
480 // size_t len = packet->getPayloadSize();
481 
482  SyncSource &src = *(pl->getSourceLink()->getSource());
483  result = new AppDataUnit(*packet,src);
484 
485  // delete the packet link, but not the packet
486  delete pl;
487 // count += len;
488  } else {
489  result = NULL;
490  }
491  return result;
492 }
493 
494 // FIX: try to merge and organize
495 IncomingDataQueue::IncomingRTPPktLink*
496  IncomingDataQueue::getWaiting(uint32 timestamp, const SyncSource* src)
497 {
498  if ( src && !isMine(*src) )
499  return NULL;
500 
501  IncomingRTPPktLink *result;
502  recvLock.writeLock();
503  if ( src != NULL ) {
504  // process source specific queries:
505  // we will modify the queue of this source
506  SyncSourceLink* srcm = getLink(*src);
507 
508  // first, delete all older packets. The while loop
509  // down here counts how many older packets are there;
510  // then the for loop deletes them and advances l till
511  // the first non older packet.
512  int nold = 0;
513  IncomingRTPPktLink* l = srcm->getFirst();
514  if ( !l ) {
515  result = NULL;
516  recvLock.unlock();
517  return result;
518  }
519  while ( l && ((l->getTimestamp() < timestamp) ||
520  end2EndDelayed(*l))) {
521  nold++;
522  l = l->getSrcNext();
523  }
524  // to know whether the global queue gets empty
525  bool nonempty = false;
526  for ( int i = 0; i < nold; i++) {
527  l = srcm->getFirst();
528  srcm->setFirst(srcm->getFirst()->getSrcNext());;
529  // unlink from the global queue
530  nonempty = false;
531  if ( l->getPrev() ){
532  nonempty = true;
533  l->getPrev()->setNext(l->getNext());
534  } if ( l->getNext() ) {
535  nonempty = true;
536  l->getNext()->setPrev(l->getPrev());
537  }
538  // now, delete it
539  onExpireRecv(*(l->getPacket()));// notify packet discard
540  delete l->getPacket();
541  delete l;
542  }
543  // return the packet, if found
544  if ( !srcm->getFirst() ) {
545  // threre are no more packets from this source
546  srcm->setLast(NULL);
547  if ( !nonempty )
548  recvFirst = recvLast = NULL;
549  result = NULL;
550  } else if ( srcm->getFirst()->getTimestamp() > timestamp ) {
551  // threre are only newer packets from this source
552  srcm->getFirst()->setSrcPrev(NULL);
553  result = NULL;
554  } else {
555  // (src->getFirst()->getTimestamp() == stamp) is true
556  result = srcm->getFirst();
557  // unlink the selected packet from the global queue
558  if ( result->getPrev() )
559  result->getPrev()->setNext(result->getNext());
560  else
561  recvFirst = result->getNext();
562  if ( result->getNext() )
563  result->getNext()->setPrev(result->getPrev());
564  else
565  recvLast = result->getPrev();
566  // unlink the selected packet from the source queue
567  srcm->setFirst(result->getSrcNext());
568  if ( srcm->getFirst() )
569  srcm->getFirst()->setPrev(NULL);
570  else
571  srcm->setLast(NULL);
572  }
573  } else {
574  // process source unspecific queries
575  int nold = 0;
576  IncomingRTPPktLink* l = recvFirst;
577  while ( l && (l->getTimestamp() < timestamp ||
578  end2EndDelayed(*l) ) ){
579  nold++;
580  l = l->getNext();
581  }
582  for (int i = 0; i < nold; i++) {
583  IncomingRTPPktLink* l = recvFirst;
584  recvFirst = recvFirst->getNext();
585  // unlink the packet from the queue of its source
586  SyncSourceLink* src = l->getSourceLink();
587  src->setFirst(l->getSrcNext());
588  if ( l->getSrcNext() )
589  l->getSrcNext()->setSrcPrev(NULL);
590  else
591  src->setLast(NULL);
592  // now, delete it
593  onExpireRecv(*(l->getPacket()));// notify packet discard
594  delete l->getPacket();
595  delete l;
596  }
597 
598  // return the packet, if found
599  if ( !recvFirst ) {
600  // there are no more packets in the queue
601  recvLast = NULL;
602  result = NULL;
603  } else if ( recvFirst->getTimestamp() > timestamp ) {
604  // there are only newer packets in the queue
605  if (l)
606  l->setPrev(NULL);
607  result = NULL;
608  } else {
609  // (recvFirst->getTimestamp() == stamp) is true
610  result = recvFirst;
611  // unlink the selected packet from the global queue
612  recvFirst = recvFirst->getNext();
613  if ( recvFirst )
614  recvFirst->setPrev(NULL);
615  else
616  recvLast = NULL;
617  // unlink the selected packet from the queue
618  // of its source
619  SyncSourceLink* src = result->getSourceLink();
620  src->setFirst(result->getSrcNext());
621  if ( src->getFirst() )
622  src->getFirst()->setSrcPrev(NULL);
623  else
624  src->setLast(NULL);
625  }
626  }
627  recvLock.unlock();
628  return result;
629 }
630 
631 bool
632  IncomingDataQueue::recordReception(SyncSourceLink& srcLink,
633 const IncomingRTPPkt& pkt, const timeval recvtime)
634 {
635  bool result = true;
636 
637  // Source validation.
638  SyncSource* src = srcLink.getSource();
639  if ( !(srcLink.isValid()) ) {
640  // source is not yet valid.
641  if ( pkt.getSeqNum() == srcLink.getMaxSeqNum() + 1 ) {
642  // packet in sequence.
643  srcLink.decProbation();
644  if ( srcLink.isValid() ) {
645  // source has become valid.
646  // TODO: avoid this the first time.
647  srcLink.initSequence(pkt.getSeqNum());
648  } else {
649  result = false;
650  }
651  } else {
652  // packet not in sequence.
653  srcLink.probation = getMinValidPacketSequence() - 1;
654  result = false;
655  }
656  srcLink.setMaxSeqNum(pkt.getSeqNum());
657  } else {
658  // source was already valid.
659  uint16 step = pkt.getSeqNum() - srcLink.getMaxSeqNum();
660  if ( step < getMaxPacketDropout() ) {
661  // Ordered, with not too high step.
662  if ( pkt.getSeqNum() < srcLink.getMaxSeqNum() ) {
663  // sequene number wrapped.
664  srcLink.incSeqNumAccum();
665  }
666  srcLink.setMaxSeqNum(pkt.getSeqNum());
667  } else if ( step <= (SEQNUMMOD - getMaxPacketMisorder()) ) {
668  // too high step of the sequence number.
669  if ( pkt.getSeqNum() == srcLink.getBadSeqNum() ) {
670  srcLink.initSequence(pkt.getSeqNum());
671  } else {
672  srcLink.setBadSeqNum((pkt.getSeqNum() + 1) &
673  (SEQNUMMOD - 1) );
674  //This additional check avoids that
675  //the very first packet from a source
676  //be discarded.
677  if ( 0 < srcLink.getObservedPacketCount() ) {
678  result = false;
679  } else {
680  srcLink.setMaxSeqNum(pkt.getSeqNum());
681  }
682  }
683  } else {
684  // duplicate or reordered packet
685  }
686  }
687 
688  if ( result ) {
689  // the packet is considered valid.
690  srcLink.incObservedPacketCount();
691  srcLink.incObservedOctetCount(pkt.getPayloadSize());
692  srcLink.lastPacketTime = recvtime;
693  if ( srcLink.getObservedPacketCount() == 1 ) {
694  // ooops, it's the first packet from this source
695  setSender(*src,true);
696  srcLink.setInitialDataTimestamp(pkt.getTimestamp());
697  }
698  // we record the last time a packet from this source
699  // was received, this has statistical interest and is
700  // needed to time out old senders that are no sending
701  // any longer.
702 
703  // compute the interarrival jitter estimation.
704  timeval tarrival;
705  timeval lastT = srcLink.getLastPacketTime();
706  timeval initial = srcLink.getInitialDataTime();
707  timersub(&lastT,&initial,&tarrival);
708  uint32 arrival = timeval2microtimeout(tarrival)
709  * getCurrentRTPClockRate();
710  uint32 transitTime = arrival - pkt.getTimestamp();
711  int32 delta = transitTime -
712  srcLink.getLastPacketTransitTime();
713  srcLink.setLastPacketTransitTime(transitTime);
714  if ( delta < 0 )
715  delta = -delta;
716  srcLink.setJitter( srcLink.getJitter() +
717  (1.0f / 16.0f) *
718  (static_cast<float>(delta) -
719  srcLink.getJitter()));
720  }
721  return result;
722 }
723 
724 void
725  IncomingDataQueue::recordExtraction(const IncomingRTPPkt&)
726 {}
727 
728 void
729  IncomingDataQueue::setInQueueCryptoContext(CryptoContext* cc)
730 {
731  std::list<CryptoContext *>::iterator i;
732 
733  MutexLock lock(cryptoMutex);
734  // check if a CryptoContext for a SSRC already exists. If yes
735  // remove it from list before inserting the new one.
736  for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ) {
737  if( (*i)->getSsrc() == cc->getSsrc() ) {
738  CryptoContext* tmp = *i;
739  cryptoContexts.erase(i);
740  delete tmp;
741  break;
742  }
743  }
744  cryptoContexts.push_back(cc);
745 }
746 
747 void
748  IncomingDataQueue::removeInQueueCryptoContext(CryptoContext* cc)
749 {
750  std::list<CryptoContext *>::iterator i;
751 
752  MutexLock lock(cryptoMutex);
753  if (cc == NULL) { // Remove any incoming crypto contexts
754  for (i = cryptoContexts.begin(); i != cryptoContexts.end(); ) {
755  CryptoContext* tmp = *i;
756  i = cryptoContexts.erase(i);
757  delete tmp;
758  }
759  }
760  else {
761  for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ){
762  if( (*i)->getSsrc() == cc->getSsrc() ) {
763  CryptoContext* tmp = *i;
764  cryptoContexts.erase(i);
765  delete tmp;
766  return;
767  }
768  }
769  }
770 }
771 
772 CryptoContext*
773  IncomingDataQueue::getInQueueCryptoContext(uint32 ssrc)
774 {
775  std::list<CryptoContext *>::iterator i;
776 
777  MutexLock lock(cryptoMutex);
778  for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ){
779  if( (*i)->getSsrc() == ssrc) {
780  return (*i);
781  }
782  }
783  return NULL;
784 }
785 
786 END_NAMESPACE
787 
IncomingDataQueue::onRTPPacketRecv
virtual bool onRTPPacketRecv(IncomingRTPPkt &)
A virtual function to support parsing of arriving packets to determine if they should be kept in the ...
Definition: iqueue.h:1206
ConflictHandler::ConflictingTransportAddress::next
ConflictingTransportAddress * next
Definition: iqueue.h:255
IncomingDataQueue::defaultMaxPacketDropout
static const uint16 defaultMaxPacketDropout
Definition: iqueue.h:1283
IncomingDataQueue::getWaiting
IncomingDataQueue::IncomingRTPPktLink * getWaiting(uint32 timestamp, const SyncSource *src=NULL)
This is used to fetch a packet in the receive queue and to expire packets older than the current time...
Definition: incqueue.cpp:496
MembershipBookkeeping::SEQNUMMOD
static const uint32 SEQNUMMOD
Definition: iqueue.h:855
ConflictHandler::searchDataConflict
ConflictingTransportAddress * searchDataConflict(InetAddress na, tpport_t dtp)
Definition: incqueue.cpp:54
RTPQueueBase::dispatchBYE
virtual size_t dispatchBYE(const std::string &)
A plugin point for posting of BYE messages.
Definition: queuebase.h:228
SyncSource::getDataTransportPort
tpport_t getDataTransportPort() const
Definition: sources.h:271
IncomingDataQueue::cryptoMutex
Mutex cryptoMutex
Definition: iqueue.h:1289
SyncSource
Synchronization source in an RTP session.
Definition: sources.h:192
IncomingDataQueue::recvLock
ThreadLock recvLock
Definition: iqueue.h:1277
AppDataUnit
Interface (envelope) to data received over RTP packets.
Definition: queuebase.h:68
CryptoContext
The implementation for a SRTP cryptographic context.
Definition: CryptoContext.h:82
SyncSourceHandler::setSender
void setSender(SyncSource &source, bool active)
Definition: iqueue.h:150
IncomingDataQueue::renewLocalSSRC
void renewLocalSSRC()
Definition: incqueue.cpp:128
random32
uint32 random32()
Definition: queue.cpp:492
RTPPacket::reComputePayLength
void reComputePayLength(bool padding)
Re-compute payload length.
Definition: rtppkt.cpp:178
IncomingDataQueue::maxPacketMisorder
uint16 maxPacketMisorder
Definition: iqueue.h:1285
IncomingRTPPkt
RTP packets received from other participants.
Definition: rtppkt.h:704
IncomingDataQueue::recordExtraction
void recordExtraction(const IncomingRTPPkt &pkt)
Log extraction of a packet from this source from the scheduled reception queue.
Definition: incqueue.cpp:725
IncomingDataQueue::defaultMembersSize
static const size_t defaultMembersSize
Definition: iqueue.h:1287
IncomingDataQueue::end2EndDelayed
virtual bool end2EndDelayed(IncomingRTPPktLink &)
Definition: iqueue.h:1238
MembershipBookkeeping::isRegistered
bool isRegistered(uint32 ssrc)
Returns whether there is already a synchronizacion source with "ssrc" SSRC identifier.
Definition: members.cpp:205
timeval2microtimeout
microtimeout_t timeval2microtimeout(const timeval &t)
Convert a time interval, expressed as a timeval value into a microseconds counter.
Definition: base.h:90
IncomingDataQueue::maxPacketDropout
uint16 maxPacketDropout
Definition: iqueue.h:1286
IncomingDataQueue::recvData
virtual size_t recvData(unsigned char *buffer, size_t length, InetHostAddress &host, tpport_t &port)=0
This function performs the physical I/O for reading a packet from the source.
IncomingDataQueue::checkSSRCInIncomingRTPPkt
bool checkSSRCInIncomingRTPPkt(SyncSourceLink &sourceLink, bool is_new, InetAddress &na, tpport_t tp)
Apply collision and loop detection and correction algorithm when receiving RTP data packets...
Definition: incqueue.cpp:299
IncomingDataQueue::onNewSyncSource
virtual void onNewSyncSource(const SyncSource &)
Virtual called when a new synchronization source has joined the session.
Definition: iqueue.h:1185
IncomingDataQueue::minValidPacketSequence
uint8 minValidPacketSequence
Definition: iqueue.h:1284
IncomingDataQueueBase::getMaxRecvPacketSize
size_t getMaxRecvPacketSize() const
Definition: queuebase.h:302
IncomingDataQueue::getMinValidPacketSequence
uint8 getMinValidPacketSequence() const
Get the minimun number of consecutive packets that must be received from a source before accepting it...
Definition: iqueue.h:1010
IncomingRTPPkt::getSSRC
uint32 getSSRC() const
Get synchronization source numeric identifier.
Definition: rtppkt.h:740
private.h
Declaration of ccRTP internal stuff.
IncomingDataQueue::getMaxPacketMisorder
uint16 getMaxPacketMisorder() const
Definition: iqueue.h:1022
IncomingRTPPkt::unprotect
int32 unprotect(CryptoContext *pcc)
Unprotect a received packet.
Definition: rtppkt.cpp:294
IncomingDataQueue::defaultMaxPacketMisorder
static const uint16 defaultMaxPacketMisorder
Definition: iqueue.h:1282
VDL
#define VDL(e)
Definition: private.h:110
IncomingDataQueue::getDefaultMaxPacketMisorder
uint16 getDefaultMaxPacketMisorder() const
Definition: iqueue.h:1018
RTPPacket::getTimestamp
uint32 getTimestamp() const
Definition: rtppkt.h:149
IncomingRTPPkt::isHeaderValid
bool isHeaderValid()
Get validity of this packet.
Definition: rtppkt.h:730
IncomingDataQueue::sourceExpirationPeriod
uint8 sourceExpirationPeriod
Definition: iqueue.h:1288
ConflictHandler::ConflictingTransportAddress::setNext
void setNext(ConflictingTransportAddress *nc)
Definition: iqueue.h:240
IncomingDataQueue::IncomingDataQueue
IncomingDataQueue(uint32 size)
Definition: incqueue.cpp:94
IncomingDataQueueBase::defaultMaxRecvPacketSize
static const size_t defaultMaxRecvPacketSize
Definition: queuebase.h:328
iqueue.h
Generic RTP input queues.
SyncSourceHandler::setNetworkAddress
void setNetworkAddress(SyncSource &source, InetAddress addr)
Definition: iqueue.h:162
CryptoContext::newCryptoContextForSSRC
CryptoContext * newCryptoContextForSSRC(uint32 ssrc, int roc, int64 keyDerivRate)
Derive a new Crypto Context for use with a new SSRC.
Definition: CryptoContext.cpp:484
IncomingDataQueue::isWaiting
bool isWaiting(const SyncSource *src=NULL) const
Determine if packets are waiting in the reception queue.
Definition: incqueue.cpp:144
MembershipBookkeeping::isMine
bool isMine(const SyncSource &source) const
Get whether a synchronization source is recorded in this membership controller.
Definition: iqueue.h:342
IncomingDataQueue::recordReception
bool recordReception(SyncSourceLink &srcLink, const IncomingRTPPkt &pkt, const timeval recvtime)
Log reception of a new RTP packet from this source.
Definition: incqueue.cpp:632
MembershipBookkeeping::defaultMembersHashSize
static const size_t defaultMembersHashSize
Definition: iqueue.h:854
RTPPacket::getSeqNum
uint16 getSeqNum() const
Definition: rtppkt.h:142
IncomingDataQueue::getInQueueCryptoContext
CryptoContext * getInQueueCryptoContext(uint32 ssrc)
Get an input queue CryptoContext identified by SSRC.
Definition: incqueue.cpp:773
SyncSourceHandler::setDataTransportPort
void setDataTransportPort(SyncSource &source, tpport_t p)
Definition: iqueue.h:154
IncomingDataQueue::insertRecvPacket
bool insertRecvPacket(IncomingRTPPktLink *packetLink)
Insert a just received packet in the queue (both general and source specific queues).
Definition: incqueue.cpp:369
IncomingDataQueue::getData
const AppDataUnit * getData(uint32 stamp, const SyncSource *src=NULL)
Retreive data from a specific timestamped packet if such a packet is currently available in the recei...
Definition: incqueue.cpp:472
SyncSource::getNetworkAddress
const InetAddress & getNetworkAddress() const
Definition: sources.h:277
IncomingDataQueue::recvFirst
IncomingRTPPktLink * recvFirst
Definition: iqueue.h:1279
SyncSource::getControlTransportPort
tpport_t getControlTransportPort() const
Definition: sources.h:274
MembershipBookkeeping::getSourceBySSRC
SyncSourceLink * getSourceBySSRC(uint32 ssrc, bool &created)
Get the description of a source by its ssrc identifier.
Definition: members.cpp:226
IncomingDataQueue::takeInDataPacket
virtual size_t takeInDataPacket()
This function is used by the service thread to process the next incoming packet and place it in the r...
Definition: incqueue.cpp:181
IncomingDataQueue::purgeIncomingQueue
void purgeIncomingQueue()
Definition: incqueue.cpp:105
IncomingDataQueue::defaultMinValidPacketSequence
static const uint8 defaultMinValidPacketSequence
Definition: iqueue.h:1281
RTPPacket::getPayloadSize
uint32 getPayloadSize() const
Definition: rtppkt.h:128
RTPQueueBase::getLocalSSRC
uint32 getLocalSSRC() const
Definition: queuebase.h:184
IncomingDataQueue::getMaxPacketDropout
uint16 getMaxPacketDropout() const
Definition: iqueue.h:1039
IncomingDataQueue::getDefaultMaxPacketDropout
uint16 getDefaultMaxPacketDropout() const
Definition: iqueue.h:1035
CryptoContext::deriveSrtpKeys
void deriveSrtpKeys(uint64 index)
Perform key derivation according to SRTP specification.
Definition: CryptoContext.cpp:337
CryptoContext::getSsrc
uint32 getSsrc() const
Get the SSRC of this SRTP Cryptograhic context.
Definition: CryptoContext.h:329
MembershipBookkeeping
Controls the group membership in the current session.
Definition: iqueue.h:298
IncomingDataQueue::recvLast
IncomingRTPPktLink * recvLast
Definition: iqueue.h:1279
ConflictHandler::lastConflict
ConflictingTransportAddress * lastConflict
Definition: iqueue.h:285
RTPQueueBase::getCurrentRTPClockRate
uint32 getCurrentRTPClockRate() const
Get the clock rate in RTP clock units (for instance, 8000 units per second for PCMU, or 90000 units per second for MP2T).
Definition: queuebase.h:195
ConflictHandler::ConflictingTransportAddress::ConflictingTransportAddress
ConflictingTransportAddress(InetAddress na, tpport_t dtp, tpport_t ctp)
Definition: incqueue.cpp:46
IncomingDataQueue::getFirstTimestamp
uint32 getFirstTimestamp(const SyncSource *src=NULL) const
Get timestamp of first packet waiting in the queue.
Definition: incqueue.cpp:158
IncomingDataQueue::onExpireRecv
virtual void onExpireRecv(IncomingRTPPkt &)
A hook to filter packets in the receive queue that are being expired.
Definition: iqueue.h:1217
SyncSource::getID
uint32 getID() const
Definition: sources.h:257
ConflictHandler::addConflict
void addConflict(const InetAddress &na, tpport_t dtp, tpport_t ctp)
Definition: incqueue.cpp:75
MembershipBookkeeping::getLink
SyncSourceLink * getLink(const SyncSource &source) const
Definition: iqueue.h:336
IncomingDataQueue::onSRTPPacketError
virtual bool onSRTPPacketError(IncomingRTPPkt &pkt, int32 errorCode)
A hook that gets called if the decoding of an incoming SRTP was erroneous.
Definition: iqueue.h:1234
ConflictHandler::updateConflict
void updateConflict(ConflictingTransportAddress &ca)
Definition: iqueue.h:273
IncomingDataQueue::getDefaultMinValidPacketSequence
uint8 getDefaultMinValidPacketSequence() const
Definition: iqueue.h:1002
IncomingDataQueue::removeInQueueCryptoContext
void removeInQueueCryptoContext(CryptoContext *cc)
Remove input queue CryptoContext.
Definition: incqueue.cpp:748
IncomingDataQueue::getNextDataPacketSize
virtual size_t getNextDataPacketSize() const =0
IncomingDataQueue::cryptoContexts
std::list< CryptoContext * > cryptoContexts
Definition: iqueue.h:1290
ConflictHandler::firstConflict
ConflictingTransportAddress * firstConflict
Definition: iqueue.h:285
ConflictHandler::searchControlConflict
ConflictingTransportAddress * searchControlConflict(InetAddress na, tpport_t ctp)
Definition: incqueue.cpp:64
IncomingDataQueue::setInQueueCryptoContext
void setInQueueCryptoContext(CryptoContext *cc)
Set input queue CryptoContext.
Definition: incqueue.cpp:729
SyncSourceHandler::setControlTransportPort
void setControlTransportPort(SyncSource &source, tpport_t p)
Definition: iqueue.h:158

Generated on Dec 15, 2017 for ccrtp-2.1.2 (*.h and *.cpp) and libzrtpcpp-2.3.4 (*.h), by   doxygen 1.8.6

AltStyle によって変換されたページ (->オリジナル) /