1 // Copyright (C) 2010-2014 David Sugar, Tycho Softworks.
2 // Copyright (C) 2015 Cherokees of Idaho.
3 //
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program. If not, see <http://www.gnu.org/licenses/>.
16
18
19 namespace sipwitch {
20
21 static unsigned tpriority = 0;
22 static unsigned baseport = 5062;
23 static bool ipv6 = false;
25 static mutex_t lock;
26 static media::proxy *list = NULL;
27 static fd_set connections;
28 static media::proxy *proxymap[sizeof(connections) * 8];
29 static volatile bool running = false;
30 static volatile int hiwater = 0;
31
32 #ifdef _MSWINDOWS_
33 static unsigned portcount = 0;
34 #else
35 static int control[2];
36 static unsigned portcount = 38;
37 #endif
38
39 static media _proxy;
40 static media::thread *th = NULL;
41
42 static unsigned align(
unsigned value)
43 {
44 return ((value + 1) / 2) * 2;
45 }
46
48 {
49 }
50
52 {
54 th->start(tpriority);
55 }
56
58 {
59 #ifdef _MSWINDOWS_
60 #else
61 char buf[1];
62 if(::write(
control[1], &buf, 1) < 1)
63 shell::log(shell::ERR, "media notify failure");
64 #endif
65 }
66
68 {
69 running = false;
70
72
73 while(!running) {
74 Thread::sleep(100);
75 }
76 }
77
79 {
80 fd_set session;
81 socket_t max;
82
83 shell::log(
DEBUG1,
"starting media thread");
84 running = true;
85 socket_t so;
87 time_t now;
88
89 while(running) {
90 lock.acquire();
91 max = hiwater;
92 memcpy(&session, &connections, sizeof(session));
93 lock.release();
94 select(max, &session, NULL, NULL, NULL);
95 if(!running)
96 break;
97
98 time(&now);
99 for(so = 0; so < max; ++so) {
100 #ifdef _MSWINDOWS_
101 #else
102 char buf[1];
103 if(so ==
control[0] && FD_ISSET(so, &session)) {
104 if(::read(so, buf, 1) < 1)
105 shell::log(shell::ERR, "media control failure");
106 continue;
107 }
108 #endif
109 mp = NULL;
110 if(!FD_ISSET(so, &session))
111 continue;
112
113 lock.acquire();
114 mp = proxymap[so];
115 if(mp->
so == INVALID_SOCKET) {
116 proxymap[so] = NULL;
117 mp = NULL;
118 }
119
122 else if(mp)
124
125 lock.release();
126 }
127 }
128
129 shell::log(
DEBUG1,
"stopping media thread");
130 running = true;
131 }
132
135 {
140 }
141
143 {
144 Socket::release(so);
145 }
146
148 {
149 LinkedObject::release();
150 }
151
153 {
154 char buffer[1024];
155 struct sockaddr_storage where;
156 struct sockaddr *wp = (struct sockaddr *)&where;
157 ssize_t
count = Socket::recvfrom(so, buffer,
sizeof(buffer), 0, &where);
158
159 if(count < 1)
160 return;
161
162 if(Socket::equal(wp, (struct sockaddr *)&local)) {
163 Socket::sendto(so, buffer, count, 0, (struct sockaddr *)&remote);
164 return;
165 }
166 Socket::store(&remote, wp);
167 Socket::sendto(so, buffer, count, 0, (struct sockaddr *)&local);
168 }
169
171 {
172 struct sockaddr *hp = (struct sockaddr *)&local;
173
174 switch(host->sa_family) {
175 #ifdef AF_INET6
176 case AF_INET6:
177 ((struct sockaddr_in6*)(host))->sin6_port =
178 ((struct sockaddr_in6 *)(hp))->sin6_port;
179 break;
180 #endif
181 case AF_INET:
182 ((struct sockaddr_in*)(host))->sin_port =
183 ((struct sockaddr_in *)(hp))->sin_port;
184 }
185 Socket::store(&local, host);
186 }
187
189 {
190 struct sockaddr *iface = parser->
peering;
191 struct sockaddr *host = (struct sockaddr *)&parser->local;
192
194 Socket::store(&local, host);
195
196 switch(iface->sa_family) {
197 #ifdef AF_INET6
198 case AF_INET6:
199 so = Socket::create(AF_INET6, SOCK_DGRAM, 0);
200 ((
struct sockaddr_in6*)(host))->sin6_port = htons(++parser->
mediaport);
201 ((struct sockaddr_in6*)(iface))->sin6_port = htons(port);
202 break;
203 #endif
204 case AF_INET:
205 so = Socket::create(AF_INET, SOCK_DGRAM, 0);
206 ((
struct sockaddr_in*)(host))->sin_port = htons(++parser->
mediaport);
207 ((struct sockaddr_in*)(iface))->sin_port = htons(port);
208 }
209
210 if(so == INVALID_SOCKET)
211 return false;
212
213 memset(&remote, 0, sizeof(remote));
214 Socket::store(&peering, iface);
215 Socket::bindto(so, iface);
216 FD_SET(so, &connections);
217 if(so >= (socket_t)hiwater)
218 hiwater = so + 1;
219 proxymap[so] = this;
220 return true;
221 }
222
224 {
225 expire = expires;
226 if(expire || so == INVALID_SOCKET)
227 return;
228
229 FD_CLR(so, &connections);
230 proxymap[so] = NULL;
231 Socket::release(so);
232 so = INVALID_SOCKET;
233 }
234
236 {
237 outdata = result = NULL;
238 bufdata = NULL;
239 mediacount = 0;
240 mediaport = 0;
241 nat = NULL;
242 memset(&local, 0, sizeof(local));
243 memset(&top, 0, sizeof(local));
244 }
245
247 {
248 set(source, target, len);
249 }
250
252 {
253 linked_pointer<media::proxy> pp = *nat;
254
255 while(is(pp) && mediacount--) {
256 pp->reconnect((struct sockaddr *)&local);
257 pp.next();
258 }
259 memcpy(&local, &top, sizeof(local));
260 }
261
263 {
264 outdata = result = target;
265 bufdata = source;
266 outpos = 0;
267 }
268
270 {
271 char *base = buffer;
272 size_t blen = len;
273
274 // if eod, return NULL
275 if(!bufdata || *bufdata == 0) {
276 *buffer = 0;
277 return NULL;
278 }
279
280 while(len > 1 && *bufdata != 0) {
281 if(*bufdata == '\r') {
282 ++bufdata;
283 continue;
284 }
285 else if(*bufdata == '\n') {
286 ++bufdata;
287 break;
288 }
289 *(buffer++) = *(bufdata++);
290 --len;
291 }
292 *buffer = 0;
293 check_connect(base, blen);
294
295 if(!result)
296 return NULL;
297
298 return base;
299 }
300
302 {
303 char *cp, *ep, *sp;
304 char tmp[128];
305 char mtype[32];
306 unsigned tport;
307 unsigned tcount = 1;
309
310 if(strnicmp(buffer, "m=", 2))
311 return;
312
313 cp = sp = strchr(buffer, ' ');
314 if(!cp)
315 return;
316
317 while(isspace(*cp))
318 ++cp;
319
320 tport = atoi(cp);
321 if(!tport)
322 return;
323
324 ep = strchr(cp, '/');
325 if(ep)
326 tcount = atoi(ep + 1);
327
328 // at the moment we can only do rtp/rtcp pairs...
329 if(tcount > 2) {
330 result = NULL;
331 return;
332 }
333
334 mediacount = tcount;
335 tcount = align(tcount);
336
337 ep = strchr(cp, ' ');
338 if(!ep)
339 ep = (char *)"";
340 else while(isspace(*ep))
341 ++ep;
342
343 mediaport = tport;
344 mediacount = tcount;
345 tport = 0;
346
347 lock.acquire();
348 String::set(tmp, sizeof(tmp), ep);
349 while(tcount--) {
351 if(!pp) {
352 result = NULL;
353 lock.release();
354 return;
355 }
356 if(!tport)
357 tport = (pp->
port / 2) * 2;
358 }
359 lock.release();
360
361 *sp = 0;
362 String::set(mtype, sizeof(mtype), buffer);
363 if(mediacount > 1)
364 snprintf(buffer, len, "%s %u/%u %s",
365 mtype, tport, mediacount, tmp);
366 else
367 snprintf(buffer, len, "%s %u %s",
368 mtype, tport, tmp);
369
370 mediacount = align(mediacount);
371 }
372
374 {
375 char *cp = buffer + 4;
376 char *ap;
377 char ttl[16];
378 const struct sockaddr *hp;
379
380 if(strnicmp(buffer, "c=in", 4))
381 return;
382
383 while(isspace(*cp))
384 ++cp;
385
386 if(ipv6 && strnicmp(cp, "ip6", 3))
387 return;
388 else if(!ipv6 && strnicmp(cp, "ip4", 3))
389 return;
390
391 ap = cp + 3;
392 while(isspace(*ap))
393 ++ap;
394
395 cp = strchr(ap, '/');
396 if(cp) {
397 String::set(ttl, sizeof(ttl), cp);
398 *cp = 0;
399 }
400 else
401 ttl[0] = 0;
402
403 if(!Socket::is_numeric(ap)) {
404 invalid:
405 *cp = '/';
406 return;
407 }
408
409 Socket::address addr(ap);
410 hp = addr.getAddr();
411 if(!hp)
412 goto invalid;
413
414 Socket::store(&local, hp);
415 if(!mediaport)
416 Socket::store(&top, hp);
417 else
418 reconnect();
419
420 Socket::query((struct sockaddr *)&peering, ap, len - 8);
421 String::add(buffer, len, ttl);
422 }
423
425 {
426 size_t outsize = 0;
427
428 if(!outdata)
429 return 0;
430
432 ++outsize;
433 *(outdata++) = *(buffer++);
434 }
435
436 *(outdata++) = '\r';
437 *(outdata++) = '\n';
438 *outdata = 0;
439 return outsize + 2;
440 }
441
444 {
445 }
446
448 {
449 LinkedObject::release();
450 }
451
453 {
454 assert(cfg != NULL);
455
457 return;
458
460
461 linked_pointer<service::keynode> mp = cfg->
getList(
"media");
463
464 while(is(mp)) {
465 key = mp->getId();
466 value = mp->getPointer();
467 if(key && value) {
468 if(!stricmp(key, "port"))
469 baseport = (atoi(value) / 2) * 2;
470 else if(!stricmp(key, "priority"))
471 tpriority = atoi(value);
472 else if(!stricmp(key, "count"))
473 portcount = align(atoi(value));
474 }
475 mp.next();
476 }
477 if(portcount)
478 shell::log(
DEBUG2,
"media proxy configured for %d ports", portcount);
479 else
480 shell::log(
DEBUG1,
"media proxy disabled");
481 }
482
484 {
485 if(portcount)
486 shell::log(
DEBUG1,
"starting media proxy");
487 else
488 return;
489
490 memset(proxymap, 0, sizeof(proxymap));
491 memset(&connections, 0, sizeof(connections));
492
493 #ifdef _MSWINDOWS_
494 #else
496 shell::log(shell::ERR, "media proxy startup failed");
497 return;
498 }
499
500 FD_SET(
control[0], &connections);
502 #endif
503
505
507 }
508
510 {
511 if(portcount)
512 shell::log(
DEBUG1,
"stopping media proxy");
513 else
514 return;
515
517 delete[] list;
518 }
519
521 {
522 ipv6 = true;
523 }
524
526 {
527 time_t now;
528 time(&now);
529
530 linked_pointer<media::proxy> pp = runlist;
531 while(is(pp)) {
532 if(pp->expires && pp->expires < now)
533 pp->release(0);
534
535 if(pp->so == INVALID_SOCKET && !pp->fw) {
536 if(pp->activate(parser))
537 {
538 pp->delist(&runlist);
540 pp->enlist(parser->
nat);
541 return *pp;
542 }
543 else
544 break;
545 }
546 pp.next();
547 }
548 return NULL;
549 }
550
552 {
553 assert(nat != NULL);
554
556 time_t expire = 0;
557
558 if(!*nat)
559 return;
560
561 if(expires) {
562 time(&expire);
563 expire += expires;
564 }
565
566 lock.acquire();
567 linked_pointer<proxy> pp = *nat;
568 while(is(pp)) {
569 member = *pp;
570 pp.next();
572 member->enlist(&runlist);
573 }
574 lock.release();
575
576 *nat = NULL;
577 }
578
579 bool media::isProxied(
const char *source,
const char *target,
struct sockaddr_storage *peering)
580 {
581 assert(source != NULL);
582 assert(target != NULL);
583 assert(peering != NULL);
584
586
587 // if no port count, then proxy is disabled...
588 if(!portcount)
589 return false;
590
591 // if same subnets, then we know is not proxied
592 if(String::equal(source, target))
593 return false;
594
595 // if unknown networks then we cannot proxy...
596 if(String::equal(source, "-") || String::equal(target, "-"))
597 return false;
598
599 // if sdp source is external, we do not need to proxy (one-legged only)
600 // since we assume we can trust external user's public sdp
601 if(String::equal(source, "*"))
602 return false;
603
604 // if external is remote and also we're ipv6, no need to proxy either...
605 if(String::equal(target, "*") && ipv6)
606 return false;
607
608 // if remote, then peering for proxy is public address
609 if(String::equal(target, "*")) {
611 return true;
612 }
613
614 // get subnets from policy name
617
618 if(!src || !dst)
619 goto exit;
620
621 // check by interface to see if same subnet, else to get subnet peering
622 if(!Socket::equal((
struct sockaddr *)(&src->
iface), (
struct sockaddr *)(&dst->
iface))) {
623 memcpy(peering, &dst->
iface,
sizeof(
struct sockaddr_storage));
624 proxy = true;
625 }
626
627 exit:
630 // will become true later...
631 return proxy;
632 }
633
635 {
636 assert(session != NULL);
637 assert(sdpin != NULL);
638
640 struct sockaddr_storage peering;
643
646 else
648
649 // in case we had a nat chain...
650 nat = &target->nat;
652
654 String::set(session->
sdp,
sizeof(session->
sdp), sdpin);
656 }
657
658 shell::log(
DEBUG3,
"reinvite proxied %s to %s", session->
network, target->network);
659 sdp parser(sdpin, session->
sdp,
sizeof(session->
sdp));
660 parser.
peering = (
struct sockaddr *)&peering;
662
664 }
665
667 {
668 assert(session != NULL);
669 assert(sdpin != NULL);
670
674 struct sockaddr_storage peering;
675
676 if(session == target || (cr->
target != NULL && cr->
target != session))
677 return NULL;
678
679 // in case we had a nat chain...
682
684 String::set(session->
sdp,
sizeof(session->
sdp), sdpin);
686 }
687
689 sdp parser(sdpin, session->
sdp,
sizeof(session->
sdp));
690 parser.
peering = (
struct sockaddr *)&peering;
692
694 }
695
697 {
698 assert(session != NULL);
699 assert(target != NULL);
700 assert(nat != NULL);
701
702 *nat = NULL;
703 struct sockaddr_storage peering;
704
706 String::set(sdpout, size, session->
sdp);
707 return sdpout;
708 }
709
710 shell::log(
DEBUG3,
"invite proxied %s to %s", session->
network, target);
711 sdp parser(session->
sdp, sdpout, size);
712 parser.
peering = (
struct sockaddr *)&peering;
714
716 }
717
719 {
720 char buffer[256];
721
722 // simple copy rewrite parser for now....
723 while(NULL != parser->
get(buffer,
sizeof(buffer))) {
724 if(!parser->
put(buffer))
725 return NULL;
726
727 }
729 }
730
731 } // end namespace
char network[MAX_NETWORK_SIZE]
keynode * getList(const char *path)
Server control interfaces and functions.
struct sockaddr_storage iface
static void published(struct sockaddr_storage *peer)
static void release(stack::subnet *access)
static bool is_configured(void)
static stack::subnet * getSubnet(const char *id)
System configuration instance and service functions.
static unsigned short sip_port