1 /*
2 * UDP prototype streaming system
3 * Copyright (c) 2000, 2001, 2002 Fabrice Bellard
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 /**
23 * @file
24 * UDP protocol
25 */
26
27 #define _DEFAULT_SOURCE
28 #define _BSD_SOURCE /* Needed for using struct ip_mreq with recent glibc */
29
45
46 #ifdef __APPLE__
47 #include "TargetConditionals.h"
48 #endif
49
50 #if HAVE_UDPLITE_H
51 #include "udplite.h"
52 #else
53 /* On many Linux systems, udplite.h is missing but the kernel supports UDP-Lite.
54 * So, we provide a fallback here.
55 */
56 #define UDPLITE_SEND_CSCOV 10
57 #define UDPLITE_RECV_CSCOV 11
58 #endif
59
60 #ifndef IPPROTO_UDPLITE
61 #define IPPROTO_UDPLITE 136
62 #endif
63
64 #if HAVE_W32THREADS
65 #undef HAVE_PTHREAD_CANCEL
66 #define HAVE_PTHREAD_CANCEL 1
67 #endif
68
69 #if HAVE_PTHREAD_CANCEL
71 #endif
72
73 #ifndef IPV6_ADD_MEMBERSHIP
74 #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
75 #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
76 #endif
77
78 #define UDP_TX_BUF_SIZE 32768
79 #define UDP_RX_BUF_SIZE 393216
80 #define UDP_MAX_PKT_SIZE 65536
81 #define UDP_HEADER_SIZE 8
82
98
99 /* Circular Buffer variables for use in UDP receive code */
103 int64_t
bitrate;
/* number of bits to send per second */
106 #if HAVE_PTHREAD_CANCEL
110 int thread_started;
111 #endif
121
122 #define OFFSET(x) offsetof(UDPContext, x)
123 #define D AV_OPT_FLAG_DECODING_PARAM
124 #define E AV_OPT_FLAG_ENCODING_PARAM
126 {
"buffer_size",
"System data size (in bytes)",
OFFSET(buffer_size),
AV_OPT_TYPE_INT, { .i64 = -1 }, -1, INT_MAX, .flags =
D|
E },
128 {
"burst_bits",
"Max length of bursts in bits (when using bitrate)",
OFFSET(burst_bits),
AV_OPT_TYPE_INT64, { .i64 = 0 }, 0, INT64_MAX, .flags =
E },
132 {
"udplite_coverage",
"choose UDPLite head size which should be validated by checksum",
OFFSET(udplite_coverage),
AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX,
D|
E },
133 {
"pkt_size",
"Maximum UDP packet size",
OFFSET(pkt_size),
AV_OPT_TYPE_INT, { .i64 = 1472 }, -1, INT_MAX, .flags =
D|
E },
134 {
"reuse",
"explicitly allow reusing UDP sockets",
OFFSET(reuse_socket),
AV_OPT_TYPE_BOOL, { .i64 = -1 }, -1, 1,
D|
E },
135 {
"reuse_socket",
"explicitly allow reusing UDP sockets",
OFFSET(reuse_socket),
AV_OPT_TYPE_BOOL, { .i64 = -1 }, -1, 1, .flags =
D|
E },
136 {
"broadcast",
"explicitly allow or disallow broadcast destination",
OFFSET(is_broadcast),
AV_OPT_TYPE_BOOL, { .i64 = 0 }, 0, 1,
E },
138 {
"connect",
"set if connect() should be called on socket",
OFFSET(is_connected),
AV_OPT_TYPE_BOOL, { .i64 = 0 }, 0, 1, .flags =
D|
E },
139 {
"fifo_size",
"set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes",
OFFSET(circular_buffer_size),
AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX,
D },
140 {
"overrun_nonfatal",
"survive in case of UDP receiving circular buffer overrun",
OFFSET(overrun_nonfatal),
AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1,
D },
141 {
"timeout",
"set raise error timeout, in microseconds (only in read mode)",
OFFSET(timeout),
AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX,
D },
145 };
146
152 };
153
159 };
160
162 struct sockaddr *addr,
163 void *logctx)
164 {
165 int protocol, cmd;
166
167 /* There is some confusion in the world whether IP_MULTICAST_TTL
168 * takes a byte or an int as an argument.
169 * BSD seems to indicate byte so we are going with that and use
170 * int and fall back to byte to be safe */
171 switch (addr->sa_family) {
172 #ifdef IP_MULTICAST_TTL
173 case AF_INET:
174 protocol = IPPROTO_IP;
175 cmd = IP_MULTICAST_TTL;
176 break;
177 #endif
178 #ifdef IPV6_MULTICAST_HOPS
179 case AF_INET6:
180 protocol = IPPROTO_IPV6;
181 cmd = IPV6_MULTICAST_HOPS;
182 break;
183 #endif
184 default:
185 return 0;
186 }
187
188 if (setsockopt(sockfd, protocol, cmd, &mcastTTL, sizeof(mcastTTL)) < 0) {
189 /* BSD compatibility */
190 unsigned char ttl = (unsigned char) mcastTTL;
191
193 if (setsockopt(sockfd, protocol, cmd, &ttl, sizeof(ttl)) < 0) {
196 }
197 }
198
199 return 0;
200 }
201
203 struct sockaddr *local_addr, void *logctx)
204 {
205 #ifdef IP_ADD_MEMBERSHIP
206 if (addr->sa_family == AF_INET) {
207 struct ip_mreq mreq;
208
209 mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
210 if (local_addr)
211 mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr;
212 else
213 mreq.imr_interface.s_addr = INADDR_ANY;
214 if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
217 }
218 }
219 #endif
220 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
221 if (addr->sa_family == AF_INET6) {
222 struct ipv6_mreq mreq6;
223
224 memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
225 //TODO: Interface index should be looked up from local_addr
226 mreq6.ipv6mr_interface = 0;
230 }
231 }
232 #endif
233 return 0;
234 }
235
237 struct sockaddr *local_addr, void *logctx)
238 {
239 #ifdef IP_DROP_MEMBERSHIP
240 if (addr->sa_family == AF_INET) {
241 struct ip_mreq mreq;
242
243 mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
244 if (local_addr)
245 mreq.imr_interface = ((struct sockaddr_in *)local_addr)->sin_addr;
246 else
247 mreq.imr_interface.s_addr = INADDR_ANY;
248 if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
250 return -1;
251 }
252 }
253 #endif
254 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
255 if (addr->sa_family == AF_INET6) {
256 struct ipv6_mreq mreq6;
257
258 memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
259 //TODO: Interface index should be looked up from local_addr
260 mreq6.ipv6mr_interface = 0;
263 return -1;
264 }
265 }
266 #endif
267 return 0;
268 }
269
271 int sockfd, struct sockaddr *addr,
274 int nb_sources, int include)
275 {
277 if (addr->sa_family != AF_INET) {
278 #if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE)
279 /* For IPv4 prefer the old approach, as that alone works reliably on
280 * Windows and it also supports supplying the interface based on its
281 * address. */
283 for (
i = 0;
i < nb_sources;
i++) {
284 struct group_source_req mreqs;
285 int level = addr->sa_family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6;
286
287 //TODO: Interface index should be looked up from local_addr
288 mreqs.gsr_interface = 0;
289 memcpy(&mreqs.gsr_group, addr, addr_len);
291
292 if (setsockopt(sockfd,
level,
293 include ? MCAST_JOIN_SOURCE_GROUP : MCAST_BLOCK_SOURCE,
294 (const void *)&mreqs, sizeof(mreqs)) < 0) {
295 if (include)
297 else
300 }
301 }
302 return 0;
303 #else
305 "Setting multicast sources only supported for IPv4\n");
307 #endif
308 }
309 #if HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE)
310 for (
i = 0;
i < nb_sources;
i++) {
311 struct ip_mreq_source mreqs;
312 if (
sources[
i].ss_family != AF_INET) {
315 }
316
317 mreqs.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
318 if (local_addr)
319 mreqs.imr_interface = ((struct sockaddr_in *)local_addr)->sin_addr;
320 else
321 mreqs.imr_interface.s_addr = INADDR_ANY;
322 mreqs.imr_sourceaddr.s_addr = ((
struct sockaddr_in *)&
sources[
i])->sin_addr.s_addr;
323
324 if (setsockopt(sockfd, IPPROTO_IP,
325 include ? IP_ADD_SOURCE_MEMBERSHIP : IP_BLOCK_SOURCE,
326 (const void *)&mreqs, sizeof(mreqs)) < 0) {
327 if (include)
329 else
332 }
333 }
334 #else
336 #endif
337 return 0;
338 }
341 const char *hostname, int port)
342 {
344 int addr_len;
345
347 if (!res0)
return AVERROR(EIO);
351
352 return addr_len;
353 }
354
356 socklen_t *addr_len, const char *localaddr)
357 {
359 int udp_fd = -1;
361 int family = AF_UNSPEC;
362
363 if (((
struct sockaddr *) &
s->dest_addr)->sa_family)
364 family = ((
struct sockaddr *) &
s->dest_addr)->sa_family;
368 if (!res0)
370 for (res = res0; res; res=res->
ai_next) {
371 if (
s->udplite_coverage)
373 else
375 if (udp_fd != -1) break;
377 }
378
379 if (udp_fd < 0)
381
384
386
387 return udp_fd;
388
390 if (udp_fd >= 0)
391 closesocket(udp_fd);
392 if(res0)
394 return -1;
395 }
396
398 {
399 char sbuf[
sizeof(
int)*3+1];
401
404 return -1;
405 }
406
407 return strtol(sbuf,
NULL, 10);
408 }
409
410
411 /**
412 * If no filename is given to av_open_input_file because you want to
413 * get the local port first, then you must call this function to set
414 * the remote server address.
415 *
416 * url syntax: udp://host:port[?option=val...]
417 * option: 'ttl=n' : set the ttl value (for multicast only)
418 * 'localport=n' : set the local port
419 * 'pkt_size=n' : set max packet size
420 * 'reuse=1' : enable reusing the socket
421 * 'overrun_nonfatal=1': survive in case of circular buffer overrun
422 *
423 * @param h media file context
424 * @param uri of the remote server
425 * @return zero if no error.
426 */
428 {
430 char hostname[256], buf[10];
431 int port;
432 const char *p;
433
435
436 /* set the destination address */
438 if (
s->dest_addr_len < 0) {
440 }
442 p = strchr(uri, '?');
443 if (p) {
445 int was_connected =
s->is_connected;
446 s->is_connected = strtol(buf,
NULL, 10);
447 if (
s->is_connected && !was_connected) {
448 if (connect(
s->udp_fd, (
struct sockaddr *) &
s->dest_addr,
453 }
454 }
455 }
456 }
457
458 return 0;
459 }
460
461 /**
462 * Return the local port used by the UDP connection
463 * @param h media file context
464 * @return the local port number
465 */
467 {
469 return s->local_port;
470 }
471
472 /**
473 * Return the udp file handle for select() usage to wait for several RTP
474 * streams at the same time.
475 * @param h media file context
476 */
478 {
481 }
482
483 #if HAVE_PTHREAD_CANCEL
484 static void *circular_buffer_task_rx( void *_URLContext)
485 {
488 int old_cancelstate;
489
491
496 s->circular_buffer_error =
AVERROR(EIO);
497 goto end;
498 }
499 while(1) {
502 socklen_t addr_len = sizeof(addr);
503
505 /* Blocking operations are always cancellation points;
506 see "General Information" / "Thread Cancelation Overview"
507 in Single Unix. */
509 len = recvfrom(
s->udp_fd,
s->tmp+4,
sizeof(
s->tmp)-4, 0, (
struct sockaddr *)&addr, &addr_len);
515 goto end;
516 }
517 continue;
518 }
520 continue;
522
524 /* No Space left */
525 if (
s->overrun_nonfatal) {
527 "Surviving due to overrun_nonfatal option\n");
528 continue;
529 } else {
531 "To avoid, increase fifo_size URL option. "
532 "To survive in such case, use overrun_nonfatal option\n");
533 s->circular_buffer_error =
AVERROR(EIO);
534 goto end;
535 }
536 }
539 }
540
541 end:
545 }
546
547 static void *circular_buffer_task_tx( void *_URLContext)
548 {
553 int64_t sent_bits = 0;
554 int64_t burst_interval =
s->bitrate ? (
s->burst_bits * 1000000 /
s->bitrate) : 0;
555 int64_t max_delay =
s->bitrate ? ((int64_t)
h->max_packet_size * 8 * 1000000 /
s->bitrate + 1) : 0;
556
558
560
563 s->circular_buffer_error =
AVERROR(EIO);
564 goto end;
565 }
566
567 for(;;) {
569 const uint8_t *p;
571 int64_t timestamp;
572
574
577 goto end;
580 }
581
584
587
589
591
594 if (timestamp < target_timestamp) {
595 int64_t delay = target_timestamp - timestamp;
596 if (delay > max_delay) {
597 delay = max_delay;
598 start_timestamp = timestamp + delay;
599 sent_bits = 0;
600 }
602 } else {
603 if (timestamp - burst_interval > target_timestamp) {
604 start_timestamp = timestamp - burst_interval;
605 sent_bits = 0;
606 }
607 }
608 sent_bits +=
len * 8;
609 target_timestamp = start_timestamp + sent_bits * 1000000 /
s->bitrate;
610 }
611
616 if (!
s->is_connected) {
617 ret = sendto (
s->udp_fd, p,
len, 0,
618 (
struct sockaddr *) &
s->dest_addr,
620 } else
621 ret = send(
s->udp_fd, p,
len, 0);
625 } else {
629 s->circular_buffer_error =
ret;
632 }
633 }
634 }
635
637 }
638
639 end:
642 }
643
644
645 #endif
646
647 /* put it in UDP context */
648 /* return non zero if error */
650 {
651 char hostname[1024];
652 int port, udp_fd = -1,
tmp, bind_ret = -1, dscp = -1;
654 int is_output;
655 const char *p;
656 char buf[256];
660
662
664 if (
s->buffer_size < 0)
666
670 }
671
675 }
676
677 p = strchr(uri, '?');
678 if (p) {
681 s->reuse_socket = strtol(buf, &endptr, 10);
682 /* assume if no digits were found it is a request to enable it */
683 if (buf == endptr)
685 }
688 s->overrun_nonfatal = strtol(buf, &endptr, 10);
689 /* assume if no digits were found it is a request to enable it */
690 if (buf == endptr)
691 s->overrun_nonfatal = 1;
692 if (!HAVE_PTHREAD_CANCEL)
694 "'overrun_nonfatal' option was set but it is not supported "
695 "on this build (pthread support is required)\n");
696 }
698 s->ttl = strtol(buf,
NULL, 10);
699 if (
s->ttl < 0 ||
s->ttl > 255) {
703 }
704 }
706 s->udplite_coverage = strtol(buf,
NULL, 10);
707 }
709 s->local_port = strtol(buf,
NULL, 10);
710 }
712 s->pkt_size = strtol(buf,
NULL, 10);
713 }
715 s->buffer_size = strtol(buf,
NULL, 10);
716 }
718 s->is_connected = strtol(buf,
NULL, 10);
719 }
721 dscp = strtol(buf,
NULL, 10);
722 }
724 s->circular_buffer_size = strtol(buf,
NULL, 10);
725 if (!HAVE_PTHREAD_CANCEL)
727 "'circular_buffer_size' option was set but it is not supported "
728 "on this build (pthread support is required)\n");
729 }
731 s->bitrate = strtoll(buf,
NULL, 10);
732 if (!HAVE_PTHREAD_CANCEL)
734 "'bitrate' option was set but it is not supported "
735 "on this build (pthread support is required)\n");
736 }
738 s->burst_bits = strtoll(buf,
NULL, 10);
739 }
743 }
747 }
751 }
753 s->timeout = strtol(buf,
NULL, 10);
755 s->is_broadcast = strtol(buf,
NULL, 10);
756 }
757 /* handling needed to support options picking from both AVOption and URL */
758 s->circular_buffer_size *= 188;
760 h->max_packet_size =
s->pkt_size;
761 } else {
763 }
764 h->rw_timeout =
s->timeout;
765
766 /* fill the dest addr */
768
769 /* XXX: fix av_url_split */
770 if (hostname[0] == '0円' || hostname[0] == '?') {
771 /* only accepts null hostname if input */
775 }
776 } else {
779 }
780
782 s->local_port = port;
783
785 if (udp_fd < 0) {
788 }
789
790 s->local_addr_storage=my_addr;
//store for future multicast join
791
792 /* Follow the requested reuse option, unless it's multicast in which
793 * case enable reuse unless explicitly disabled.
794 */
795 if (
s->reuse_socket > 0 || (
s->is_multicast &&
s->reuse_socket < 0)) {
797 if (setsockopt (udp_fd, SOL_SOCKET, SO_REUSEADDR, &(
s->reuse_socket),
sizeof(
s->reuse_socket)) != 0) {
800 }
801 }
802
803 if (
s->is_broadcast) {
804 #ifdef SO_BROADCAST
805 if (setsockopt (udp_fd, SOL_SOCKET, SO_BROADCAST, &(
s->is_broadcast),
sizeof(
s->is_broadcast)) != 0) {
808 }
809 #else
812 #endif
813 }
814
815 /* Set the checksum coverage for UDP-Lite (RFC 3828) for sending and receiving.
816 * The receiver coverage has to be less than or equal to the sender coverage.
817 * Otherwise, the receiver will drop all packets.
818 */
819 if (
s->udplite_coverage) {
822
825 }
826
827 if (dscp >= 0) {
828 dscp <<= 2;
829 if (setsockopt (udp_fd, IPPROTO_IP, IP_TOS, &dscp, sizeof(dscp)) != 0) {
832 }
833 }
834
835 /* If multicast, try binding the multicast address first, to avoid
836 * receiving UDP packets from other sources aimed at the same UDP
837 * port. This fails on windows. This makes sending to the same address
838 * using sendto() fail, so only do it if we're opened in read-only mode. */
840 bind_ret = bind(udp_fd,(
struct sockaddr *)&
s->dest_addr,
len);
841 }
842 /* bind to the local address if not multicast or if the multicast
843 * bind failed */
844 /* the bind is needed to give a port to the socket now */
845 if (bind_ret < 0 && bind(udp_fd,(
struct sockaddr *)&my_addr,
len) < 0) {
849 }
850
851 len =
sizeof(my_addr);
852 getsockname(udp_fd, (
struct sockaddr *)&my_addr, &
len);
854
855 if (
s->is_multicast) {
857 /* output */
860 }
862 /* input */
863 if (
s->filters.nb_include_addrs) {
865 (
struct sockaddr *)&
s->dest_addr,
866 s->dest_addr_len, &
s->local_addr_storage,
867 s->filters.include_addrs,
868 s->filters.nb_include_addrs, 1)) < 0)
870 } else {
872 (
struct sockaddr *)&
s->local_addr_storage,
h)) < 0)
874 }
875 if (
s->filters.nb_exclude_addrs) {
877 (
struct sockaddr *)&
s->dest_addr,
878 s->dest_addr_len, &
s->local_addr_storage,
879 s->filters.exclude_addrs,
880 s->filters.nb_exclude_addrs, 0)) < 0)
882 }
883 }
884 }
885
886 if (is_output) {
887 /* limit the tx buf size to limit latency */
888 tmp =
s->buffer_size;
889 if (setsockopt(udp_fd, SOL_SOCKET, SO_SNDBUF, &
tmp,
sizeof(
tmp)) < 0) {
893 }
894 } else {
895 /* set udp recv buffer size to the requested value (default UDP_RX_BUF_SIZE) */
896 tmp =
s->buffer_size;
897 if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &
tmp,
sizeof(
tmp)) < 0) {
899 }
901 if (getsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &
tmp, &
len) < 0) {
903 } else {
905 if(tmp < s->buffer_size)
906 av_log(
h,
AV_LOG_WARNING,
"attempted to set receive buffer to size %d but it only ended up set as %d\n",
s->buffer_size,
tmp);
907 }
908
909 /* make the socket non-blocking */
911 }
912 if (
s->is_connected) {
913 if (connect(udp_fd, (
struct sockaddr *) &
s->dest_addr,
s->dest_addr_len)) {
917 }
918 }
919
921
922 #if HAVE_PTHREAD_CANCEL
923 /*
924 Create thread in case of:
925 1. Input and circular_buffer_size is set
926 2. Output and bitrate and circular_buffer_size is set
927 */
928
929 if (is_output &&
s->bitrate && !
s->circular_buffer_size) {
930 /* Warn user in case of 'circular_buffer_size' is not set */
931 av_log(
h,
AV_LOG_WARNING,
"'bitrate' option was set but 'circular_buffer_size' is not, but required\n");
932 }
933
934 if ((!is_output &&
s->circular_buffer_size) || (is_output &&
s->bitrate &&
s->circular_buffer_size)) {
935 /* start the task going */
940 }
946 }
951 goto cond_fail;
952 }
953 ret =
pthread_create(&
s->circular_buffer_thread,
NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx,
h);
957 goto thread_fail;
958 }
959 s->thread_started = 1;
960 }
961 #endif
962
963 return 0;
964 #if HAVE_PTHREAD_CANCEL
965 thread_fail:
967 cond_fail:
969 #endif
971 if (udp_fd >= 0)
972 closesocket(udp_fd);
976 }
977
979 {
981
982 // set default checksum coverage
984
986 }
987
989 {
993 socklen_t addr_len = sizeof(addr);
994 #if HAVE_PTHREAD_CANCEL
996
999 do {
1001 if (avail) { // >=size) {
1003
1009 }
1010
1014 return avail;
1015 }
else if(
s->circular_buffer_error){
1016 int err =
s->circular_buffer_error;
1018 return err;
1019 } else if(nonblock) {
1022 } else {
1023 /* FIXME: using the monotonic clock would be better,
1024 but it does not exist on all supported platforms. */
1026 struct timespec tv = { .tv_sec = t / 1000000,
1027 .tv_nsec = (t % 1000000) * 1000 };
1029 if (err) {
1031 return AVERROR(err == ETIMEDOUT ? EAGAIN : err);
1032 }
1033 nonblock = 1;
1034 }
1035 } while(1);
1036 }
1037 #endif
1038
1043 }
1044 ret = recvfrom(
s->udp_fd, buf,
size, 0, (
struct sockaddr *)&addr, &addr_len);
1050 }
1051
1053 {
1056
1057 #if HAVE_PTHREAD_CANCEL
1060
1062
1063 /*
1064 Return error if last tx failed.
1065 Here we can't know on which packet error was, but it needs to know that error exists.
1066 */
1067 if (
s->circular_buffer_error<0) {
1068 int err =
s->circular_buffer_error;
1070 return err;
1071 }
1072
1074 /* What about a partial packet tx ? */
1077 }
1084 }
1085 #endif
1090 }
1091
1092 if (!
s->is_connected) {
1093 ret = sendto (
s->udp_fd, buf,
size, 0,
1094 (
struct sockaddr *) &
s->dest_addr,
1096 } else
1097 ret = send(
s->udp_fd, buf,
size, 0);
1098
1100 }
1101
1103 {
1105
1106 #if HAVE_PTHREAD_CANCEL
1107 // Request close once writing is finished
1113 }
1114 #endif
1115
1118 (
struct sockaddr *)&
s->local_addr_storage,
h);
1119 #if HAVE_PTHREAD_CANCEL
1120 if (
s->thread_started) {
1122 // Cancel only read, as write has been signaled as success to the user
1124 #ifdef _WIN32
1125 /* recvfrom() is not a cancellation point for win32, so we shutdown
1126 * the socket and abort pending IO, subsequent recvfrom() calls
1127 * will fail with WSAESHUTDOWN causing the thread to exit. */
1128 shutdown(
s->udp_fd, SD_RECEIVE);
1129 CancelIoEx((HANDLE)(SOCKET)
s->udp_fd,
NULL);
1130 #else
1131 pthread_cancel(
s->circular_buffer_thread);
1132 #endif
1133 }
1139 }
1140 #endif
1141 closesocket(
s->udp_fd);
1144 return 0;
1145 }
1146
1157 };
1158
1169 };