1 /*
2 * Input async protocol.
3 * Copyright (c) 2015 Zhang Rui <bbcallen@gmail.com>
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 *
21 * Based on libavformat/cache.c by Michael Niedermayer
22 */
23
24 /**
25 * @TODO
26 * support timeout
27 * support work with concatdec, hls
28 */
29
38 #include <stdint.h>
39
40 #if HAVE_UNISTD_H
41 #include <unistd.h>
42 #endif
43
44 #define BUFFER_CAPACITY (4 * 1024 * 1024)
45 #define READ_BACK_CAPACITY (4 * 1024 * 1024)
46 #define SHORT_SEEK_THRESHOLD (256 * 1024)
47
49 {
52
55
59
65
69
73
78
82
84 {
89
91 return 0;
92 }
93
95 {
97 }
98
100 {
103 }
104
106 {
108 }
109
111 {
113 }
114
116 {
118
120 if (dest)
123
127 }
128
130 }
131
133 {
137
140 c->inner_io_error =
ret < 0 ?
ret : 0;
141
142 return c->inner_io_error;
143 }
144
146 {
148
153
155 }
156
158 {
160 }
161
163 {
167 return 0;
168 }
169
171 {
174
175 if (
c->abort_request)
176 return 1;
177
179 c->abort_request = 1;
180
181 return c->abort_request;
182 }
183
185 {
191
193
194 while (1) {
195 int fifo_space, to_copy;
196
199 c->io_eof_reached = 1;
203 break;
204 }
205
206 if (
c->seek_request) {
207 seek_ret =
ffurl_seek(
c->inner,
c->seek_pos,
c->seek_whence);
208 if (seek_ret >= 0) {
209 c->io_eof_reached = 0;
212 }
213
214 c->seek_completed = 1;
215 c->seek_ret = seek_ret;
217
218
221 continue;
222 }
223
225 if (
c->io_eof_reached || fifo_space <= 0) {
229 continue;
230 }
232
233 to_copy =
FFMIN(4096, fifo_space);
235
238 c->io_eof_reached = 1;
239 if (
c->inner_io_error < 0)
240 c->io_error =
c->inner_io_error;
241 }
242
245 }
246
248 }
249
251 {
255
257
260 goto fifo_fail;
261
262 /* wrap interrupt callback */
263 c->interrupt_callback =
h->interrupt_callback;
267 goto url_fail;
268 }
269
271 h->is_streamed =
c->inner->is_streamed;
272
277 goto mutex_fail;
278 }
279
284 goto cond_wakeup_main_fail;
285 }
286
291 goto cond_wakeup_background_fail;
292 }
293
298 goto thread_fail;
299 }
300
301 return 0;
302
303 thread_fail:
305 cond_wakeup_background_fail:
307 cond_wakeup_main_fail:
309 mutex_fail:
311 url_fail:
313 fifo_fail:
315 }
316
318 {
321
323 c->abort_request = 1;
326
330
336
337 return 0;
338 }
339
341 {
344 int read_complete = !dest;
347
349
350 while (to_read > 0) {
354 break;
355 }
358 if (to_copy > 0) {
360 if (dest)
361 dest = (uint8_t *)dest + to_copy;
362 c->logical_pos += to_copy;
363 to_read -= to_copy;
365
366 if (to_read <= 0 || !read_complete)
367 break;
368 }
else if (
c->io_eof_reached) {
372 else
374 }
375 break;
376 }
379 }
380
383
385 }
386
388 {
390 }
391
393 {
399 int fifo_size_of_read_back;
400
403 return c->logical_size;
404 } else if (whence == SEEK_CUR) {
406 new_logical_pos =
pos +
c->logical_pos;
407 } else if (whence == SEEK_SET){
409 new_logical_pos =
pos;
410 } else {
412 }
413 if (new_logical_pos < 0)
415
418 if (new_logical_pos ==
c->logical_pos) {
419 /* current position */
420 return c->logical_pos;
421 }
else if ((new_logical_pos >= (
c->logical_pos - fifo_size_of_read_back)) &&
423 int pos_delta = (int)(new_logical_pos -
c->logical_pos);
424 /* fast seek */
426 new_logical_pos, (
int)
c->logical_pos,
427 (
int)(new_logical_pos -
c->logical_pos),
fifo_size);
428
429 if (pos_delta > 0) {
430 // fast seek forwards
432 } else {
433 // fast seek backwards
435 c->logical_pos = new_logical_pos;
436 }
437
438 return c->logical_pos;
439 }
else if (
c->logical_size <= 0) {
440 /* can not seek */
442 }
else if (new_logical_pos >
c->logical_size) {
443 /* beyond end */
445 }
446
448
450 c->seek_pos = new_logical_pos;
451 c->seek_whence = SEEK_SET;
452 c->seek_completed = 0;
454
455 while (1) {
458 break;
459 }
460 if (
c->seek_completed) {
461 if (
c->seek_ret >= 0)
462 c->logical_pos =
c->seek_ret;
464 break;
465 }
468 }
469
471
473 }
474
475 #define OFFSET(x) offsetof(AsyncContext, x)
476 #define D AV_OPT_FLAG_DECODING_PARAM
477
480 };
481
482 #undef D
483 #undef OFFSET
484
490 };
491
500 };
501
502 #if 0
503
504 #define TEST_SEEK_POS (1536)
505 #define TEST_STREAM_SIZE (2048)
506
511
512 /* options */
513 int opt_read_error;
515
517 {
520 c->logical_size = TEST_STREAM_SIZE;
521 return 0;
522 }
523
525 {
526 return 0;
527 }
528
529 static int async_test_read(
URLContext *
h,
unsigned char *buf,
int size)
530 {
533 int read_len = 0;
534
535 if (
c->opt_read_error)
536 return c->opt_read_error;
537
538 if (
c->logical_pos >=
c->logical_size)
540
542 buf[
i] =
c->logical_pos & 0xFF;
543
545 read_len++;
546
547 if (
c->logical_pos >=
c->logical_size)
548 break;
549 }
550
551 return read_len;
552 }
553
555 {
558
560 return c->logical_size;
561 } else if (whence == SEEK_CUR) {
562 new_logical_pos =
pos +
c->logical_pos;
563 } else if (whence == SEEK_SET){
564 new_logical_pos =
pos;
565 } else {
567 }
568 if (new_logical_pos < 0)
570
571 c->logical_pos = new_logical_pos;
572 return new_logical_pos;
573 }
574
575 #define OFFSET(x) offsetof(TestContext, x)
576 #define D AV_OPT_FLAG_DECODING_PARAM
577
578 static const AVOption async_test_options[] = {
579 { "async-test-read-error", "cause read fail",
582 };
583
584 #undef D
585 #undef OFFSET
586
587 static const AVClass async_test_context_class = {
590 .option = async_test_options,
592 };
593
595 .
name =
"async-test",
596 .url_open2 = async_test_open,
597 .url_read = async_test_read,
598 .url_seek = async_test_seek,
599 .url_close = async_test_close,
601 .priv_data_class = &async_test_context_class,
602 };
603
605 {
612 unsigned char buf[4096];
614
616 ffurl_register_protocol(&ff_async_test_protocol);
617
618 /*
619 * test normal read
620 */
624
627
629 read_len = 0;
630 while (1) {
634 break;
635 }
637 break;
641 } else {
642 for (
i = 0;
i <
ret; ++
i) {
643 if (buf[
i] != (
pos & 0xFF)) {
644 printf(
"read-mismatch: actual %d, expecting %d, at %"PRId64
"\n",
645 (
int)buf[
i], (
int)(
pos & 0xFF),
pos);
646 break;
647 }
649 }
650 }
651
653 }
654 printf(
"read: %"PRId64
"\n", read_len);
655
656 /*
657 * test normal seek
658 */
661
664
665 read_len = 0;
666 while (1) {
669 break;
671 break;
675 } else {
676 for (
i = 0;
i <
ret; ++
i) {
677 if (buf[
i] != (
pos & 0xFF)) {
678 printf(
"read-mismatch: actual %d, expecting %d, at %"PRId64
"\n",
679 (
int)buf[
i], (
int)(
pos & 0xFF),
pos);
680 break;
681 }
683 }
684 }
685
687 }
688 printf(
"read: %"PRId64
"\n", read_len);
689
692
693 /*
694 * test read error
695 */
701
704
708 return 0;
709 }
710
711 #endif