1 /*
2 * UDP prototype streaming system
3 * Copyright (c) 2000, 2001, 2002 Fabrice Bellard
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 /**
23 * @file
24 * UDP protocol
25 */
26
27 #define _BSD_SOURCE /* Needed for using struct ip_mreq with recent glibc */
28
41
42 #if HAVE_PTHREAD_CANCEL
43 #include <pthread.h>
44 #endif
45
46 #ifndef HAVE_PTHREAD_CANCEL
47 #define HAVE_PTHREAD_CANCEL 0
48 #endif
49
50 #ifndef IPV6_ADD_MEMBERSHIP
51 #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
52 #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
53 #endif
54
55 #define UDP_TX_BUF_SIZE 32768
56 #define UDP_MAX_PKT_SIZE 65536
57
70
71 /* Circular Buffer variables for use in UDP receive code */
75 #if HAVE_PTHREAD_CANCEL
79 int thread_started;
80 #endif
87
88 #define OFFSET(x) offsetof(UDPContext, x)
89 #define D AV_OPT_FLAG_DECODING_PARAM
90 #define E AV_OPT_FLAG_ENCODING_PARAM
92 {
"buffer_size",
"Socket buffer size in bytes",
OFFSET(buffer_size),
AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX,
D|
E },
96 {
"reuse",
"Explicitly allow or disallow reusing UDP sockets",
OFFSET(reuse_socket),
AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1,
D|
E },
97 {
"ttl",
"Set the time to live value (for multicast only)",
OFFSET(ttl),
AV_OPT_TYPE_INT, {.i64 = 16}, 0, INT_MAX,
E },
98 {
"connect",
"Should connect() be called on socket",
OFFSET(is_connected),
AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1,
D|
E },
99 /* TODO 'sources', 'block' option */
100 {
"fifo_size",
"Set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes",
OFFSET(circular_buffer_size),
AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX,
D },
101 {
"overrun_nonfatal",
"Survive in case of UDP receiving circular buffer overrun",
OFFSET(overrun_nonfatal),
AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1,
D },
102 {
"timeout",
"In read mode: if no data arrived in more than this time interval, raise error",
OFFSET(timeout),
AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX,
D },
104 };
105
111 };
112
114 {
115 char errbuf[100];
117 av_log(ctx, level,
"%s: %s\n", prefix, errbuf);
118 }
119
121 struct sockaddr *addr)
122 {
123 #ifdef IP_MULTICAST_TTL
124 if (addr->sa_family == AF_INET) {
125 if (setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL, &mcastTTL, sizeof(mcastTTL)) < 0) {
127 return -1;
128 }
129 }
130 #endif
131 #if defined(IPPROTO_IPV6) && defined(IPV6_MULTICAST_HOPS)
132 if (addr->sa_family == AF_INET6) {
133 if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &mcastTTL, sizeof(mcastTTL)) < 0) {
135 return -1;
136 }
137 }
138 #endif
139 return 0;
140 }
141
143 {
144 #ifdef IP_ADD_MEMBERSHIP
145 if (addr->sa_family == AF_INET) {
146 struct ip_mreq mreq;
147
148 mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
149 mreq.imr_interface.s_addr= INADDR_ANY;
150 if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
152 return -1;
153 }
154 }
155 #endif
156 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
157 if (addr->sa_family == AF_INET6) {
158 struct ipv6_mreq mreq6;
159
160 memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
161 mreq6.ipv6mr_interface= 0;
164 return -1;
165 }
166 }
167 #endif
168 return 0;
169 }
170
172 {
173 #ifdef IP_DROP_MEMBERSHIP
174 if (addr->sa_family == AF_INET) {
175 struct ip_mreq mreq;
176
177 mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
178 mreq.imr_interface.s_addr= INADDR_ANY;
179 if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
181 return -1;
182 }
183 }
184 #endif
185 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
186 if (addr->sa_family == AF_INET6) {
187 struct ipv6_mreq mreq6;
188
189 memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
190 mreq6.ipv6mr_interface= 0;
193 return -1;
194 }
195 }
196 #endif
197 return 0;
198 }
199
201 int type,
int family,
int flags)
202 {
203 struct addrinfo hints = { 0 }, *res = 0;
204 int error;
205 char sport[16];
206 const char *node = 0, *service = "0";
207
208 if (port > 0) {
209 snprintf(sport,
sizeof(sport),
"%d", port);
210 service = sport;
211 }
212 if ((hostname) && (hostname[0] != '0円') && (hostname[0] != '?')) {
213 node = hostname;
214 }
218 if ((error =
getaddrinfo(node, service, &hints, &res))) {
221 }
222
223 return res;
224 }
225
227 int addr_len, char **sources,
228 int nb_sources, int include)
229 {
230 #if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE) && !defined(_WIN32)
231 /* These ones are available in the microsoft SDK, but don't seem to work
232 * as on linux, so just prefer the v4-only approach there for now. */
233 int i;
234 for (i = 0; i < nb_sources; i++) {
235 struct group_source_req mreqs;
236 int level = addr->sa_family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6;
238 SOCK_DGRAM, AF_UNSPEC,
240 if (!sourceaddr)
242
243 mreqs.gsr_interface = 0;
244 memcpy(&mreqs.gsr_group, addr, addr_len);
247
248 if (setsockopt(sockfd, level,
249 include ? MCAST_JOIN_SOURCE_GROUP : MCAST_BLOCK_SOURCE,
250 (const void *)&mreqs, sizeof(mreqs)) < 0) {
251 if (include)
253 else
256 }
257 }
258 #elif HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE)
259 int i;
260 if (addr->sa_family != AF_INET) {
262 "Setting multicast sources only supported for IPv4\n");
264 }
265 for (i = 0; i < nb_sources; i++) {
266 struct ip_mreq_source mreqs;
268 SOCK_DGRAM, AF_UNSPEC,
270 if (!sourceaddr)
272 if (sourceaddr->
ai_addr->sa_family != AF_INET) {
275 sources[i]);
277 }
278
279 mreqs.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
280 mreqs.imr_interface.s_addr = INADDR_ANY;
281 mreqs.imr_sourceaddr.s_addr = ((
struct sockaddr_in *)sourceaddr->
ai_addr)->sin_addr.s_addr;
283
284 if (setsockopt(sockfd, IPPROTO_IP,
285 include ? IP_ADD_SOURCE_MEMBERSHIP : IP_BLOCK_SOURCE,
286 (const void *)&mreqs, sizeof(mreqs)) < 0) {
287 if (include)
289 else
292 }
293 }
294 #else
296 #endif
297 return 0;
298 }
300 const char *hostname, int port)
301 {
303 int addr_len;
304
306 if (res0 == 0)
return AVERROR(EIO);
310
311 return addr_len;
312 }
313
315 socklen_t *addr_len, const char *localaddr)
316 {
317 int udp_fd = -1;
319 int family = AF_UNSPEC;
320
321 if (((
struct sockaddr *) &s->
dest_addr)->sa_family)
322 family = ((
struct sockaddr *) &s->
dest_addr)->sa_family;
325 if (res0 == 0)
326 goto fail;
327 for (res = res0; res; res=res->
ai_next) {
328 udp_fd = socket(res->ai_family, SOCK_DGRAM, 0);
329 if (udp_fd != -1) break;
331 }
332
333 if (udp_fd < 0)
334 goto fail;
335
336 memcpy(addr, res->ai_addr, res->ai_addrlen);
337 *addr_len = res->ai_addrlen;
338
340
341 return udp_fd;
342
343 fail:
344 if (udp_fd >= 0)
346 if(res0)
348 return -1;
349 }
350
352 {
353 char sbuf[sizeof(int)*3+1];
354 int error;
355
358 return -1;
359 }
360
361 return strtol(sbuf,
NULL, 10);
362 }
363
364
365 /**
366 * If no filename is given to av_open_input_file because you want to
367 * get the local port first, then you must call this function to set
368 * the remote server address.
369 *
370 * url syntax: udp://host:port[?option=val...]
371 * option: 'ttl=n' : set the ttl value (for multicast only)
372 * 'localport=n' : set the local port
373 * 'pkt_size=n' : set max packet size
374 * 'reuse=1' : enable reusing the socket
375 * 'overrun_nonfatal=1': survive in case of circular buffer overrun
376 *
377 * @param h media file context
378 * @param uri of the remote server
379 * @return zero if no error.
380 */
382 {
384 char hostname[256], buf[10];
385 int port;
386 const char *p;
387
389
390 /* set the destination address */
394 }
396 p = strchr(uri, '?');
397 if (p) {
407 }
408 }
409 }
410 }
411
412 return 0;
413 }
414
415 /**
416 * Return the local port used by the UDP connection
417 * @param h media file context
418 * @return the local port number
419 */
421 {
424 }
425
426 /**
427 * Return the udp file handle for select() usage to wait for several RTP
428 * streams at the same time.
429 * @param h media file context
430 */
432 {
435 }
436
437 #if HAVE_PTHREAD_CANCEL
438 static void *circular_buffer_task( void *_URLContext)
439 {
442 int old_cancelstate;
443
444 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
450 }
451 while(1) {
453
455 /* Blocking operations are always cancellation points;
456 see "General Information" / "Thread Cancelation Overview"
457 in Single Unix. */
458 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
460 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
462 if (len < 0) {
466 }
467 continue;
468 }
470
472 /* No Space left */
475 "Surviving due to overrun_nonfatal option\n");
476 continue;
477 } else {
479 "To avoid, increase fifo_size URL option. "
480 "To survive in such case, use overrun_nonfatal option\n");
483 }
484 }
487 }
488
493 }
494 #endif
495
496 /* put it in UDP context */
497 /* return non zero if error */
499 {
500 char hostname[1024], localaddr[1024] = "";
501 int port, udp_fd = -1, tmp, bind_ret = -1;
503 int is_output;
504 const char *p;
505 char buf[256];
508 int reuse_specified = 0;
509 int i, include = 0, num_sources = 0;
510 char *sources[32];
511
513
517
518 p = strchr(uri, '?');
519 if (p) {
523 /* assume if no digits were found it is a request to enable it */
524 if (buf == endptr)
526 reuse_specified = 1;
527 }
531 /* assume if no digits were found it is a request to enable it */
532 if (buf == endptr)
536 "'overrun_nonfatal' option was set but it is not supported "
537 "on this build (pthread support is required)\n");
538 }
540 s->
ttl = strtol(buf,
NULL, 10);
541 }
544 }
547 }
550 }
553 }
558 "'circular_buffer_size' option was set but it is not supported "
559 "on this build (pthread support is required)\n");
560 }
562 av_strlcpy(localaddr, buf,
sizeof(localaddr));
563 }
565 include = 1;
567 char *source_start;
568
569 source_start = buf;
570 while (1) {
571 char *next = strchr(source_start, ',');
572 if (next)
573 *next = '0円';
574 sources[num_sources] =
av_strdup(source_start);
575 if (!sources[num_sources])
576 goto fail;
577 source_start = next + 1;
578 num_sources++;
580 break;
581 }
582 }
585 }
586 /* handling needed to support options picking from both AVOption and URL */
590
591 /* fill the dest addr */
593
594 /* XXX: fix av_url_split */
595 if (hostname[0] == '0円' || hostname[0] == '?') {
596 /* only accepts null hostname if input */
598 goto fail;
599 } else {
601 goto fail;
602 }
603
607 if (udp_fd < 0)
608 goto fail;
609
610 /* Follow the requested reuse option, unless it's multicast in which
611 * case enable reuse unless explicitly disabled.
612 */
616 goto fail;
617 }
618
619 /* If multicast, try binding the multicast address first, to avoid
620 * receiving UDP packets from other sources aimed at the same UDP
621 * port. This fails on windows. This makes sending to the same address
622 * using sendto() fail, so only do it if we're opened in read-only mode. */
624 bind_ret = bind(udp_fd,(
struct sockaddr *)&s->
dest_addr, len);
625 }
626 /* bind to the local address if not multicast or if the multicast
627 * bind failed */
628 /* the bind is needed to give a port to the socket now */
629 if (bind_ret < 0 && bind(udp_fd,(struct sockaddr *)&my_addr, len) < 0) {
631 goto fail;
632 }
633
634 len = sizeof(my_addr);
635 getsockname(udp_fd, (struct sockaddr *)&my_addr, &len);
637
640 /* output */
642 goto fail;
643 }
645 /* input */
646 if (num_sources == 0 || !include) {
648 goto fail;
649
650 if (num_sources) {
652 goto fail;
653 }
654 } else if (include && num_sources) {
656 goto fail;
657 } else {
659 goto fail;
660 }
661 }
662 }
663
664 if (is_output) {
665 /* limit the tx buf size to limit latency */
667 if (setsockopt(udp_fd, SOL_SOCKET, SO_SNDBUF, &tmp, sizeof(tmp)) < 0) {
669 goto fail;
670 }
671 } else {
672 /* set udp recv buffer size to the largest possible udp packet size to
673 * avoid losing data on OSes that set this too low by default. */
675 if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, sizeof(tmp)) < 0) {
677 }
678 /* make the socket non-blocking */
680 }
684 goto fail;
685 }
686 }
687
688 for (i = 0; i < num_sources; i++)
690
692
693 #if HAVE_PTHREAD_CANCEL
695 int ret;
696
697 /* start the task going */
700 if (ret != 0) {
702 goto fail;
703 }
705 if (ret != 0) {
707 goto cond_fail;
708 }
710 if (ret != 0) {
712 goto thread_fail;
713 }
714 s->thread_started = 1;
715 }
716 #endif
717
718 return 0;
719 #if HAVE_PTHREAD_CANCEL
720 thread_fail:
722 cond_fail:
724 #endif
725 fail:
726 if (udp_fd >= 0)
729 for (i = 0; i < num_sources; i++)
732 }
733
735 {
737 int ret;
739
740 #if HAVE_PTHREAD_CANCEL
743 do {
745 if (avail) { // >=size) {
747
750 if(avail > size){
753 }
754
758 return avail;
762 return err;
763 } else if(nonblock) {
766 }
767 else {
768 /* FIXME: using the monotonic clock would be better,
769 but it does not exist on all supported platforms. */
771 struct timespec tv = { .tv_sec = t / 1000000,
772 .tv_nsec = (t % 1000000) * 1000 };
773 if (pthread_cond_timedwait(&s->cond, &s->mutex, &tv) < 0) {
775 return AVERROR(errno == ETIMEDOUT ? EAGAIN : errno);
776 }
777 nonblock = 1;
778 }
779 } while( 1);
780 }
781 #endif
782
785 if (ret < 0)
786 return ret;
787 }
788 ret = recv(s->
udp_fd, buf, size, 0);
789
791 }
792
794 {
796 int ret;
797
800 if (ret < 0)
801 return ret;
802 }
803
805 ret = sendto (s->
udp_fd, buf, size, 0,
808 } else
809 ret = send(s->
udp_fd, buf, size, 0);
810
812 }
813
815 {
817 int ret;
818
822 #if HAVE_PTHREAD_CANCEL
823 if (s->thread_started) {
824 pthread_cancel(s->circular_buffer_thread);
826 if (ret != 0)
830 }
831 #endif
833 return 0;
834 }
835
844 .priv_data_class = &udp_context_class,
846 };