1 /*
2 * Input async protocol.
3 * Copyright (c) 2015 Zhang Rui <bbcallen@gmail.com>
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 *
21 * Based on libavformat/cache.c by Michael Niedermayer
22 */
23
24 /**
25 * @TODO
26 * support timeout
27 * support work with concatdec, hls
28 */
29
38 #include <stdint.h>
39
40 #if HAVE_UNISTD_H
41 #include <unistd.h>
42 #endif
43
44 #define BUFFER_CAPACITY (4 * 1024 * 1024)
45 #define READ_BACK_CAPACITY (4 * 1024 * 1024)
46 #define SHORT_SEEK_THRESHOLD (256 * 1024)
47
49 {
52
55
59
65
69
73
78
82
84 {
89
91 return 0;
92 }
93
95 {
97 }
98
100 {
103 }
104
106 {
108 }
109
111 {
113 }
114
116 {
118
120 if (dest)
123
127 }
128
130 }
131
133 {
137
140 c->inner_io_error =
ret < 0 ?
ret : 0;
141
142 return c->inner_io_error;
143 }
144
146 {
149 }
150
152 {
154 }
155
157 {
161 return 0;
162 }
163
165 {
168
169 if (
c->abort_request)
170 return 1;
171
173 c->abort_request = 1;
174
175 return c->abort_request;
176 }
177
179 {
184 int64_t seek_ret;
185
186 while (1) {
187 int fifo_space, to_copy;
188
191 c->io_eof_reached = 1;
195 break;
196 }
197
198 if (
c->seek_request) {
199 seek_ret =
ffurl_seek(
c->inner,
c->seek_pos,
c->seek_whence);
200 if (seek_ret >= 0) {
201 c->io_eof_reached = 0;
204 }
205
206 c->seek_completed = 1;
207 c->seek_ret = seek_ret;
209
210
213 continue;
214 }
215
217 if (
c->io_eof_reached || fifo_space <= 0) {
221 continue;
222 }
224
225 to_copy =
FFMIN(4096, fifo_space);
227
230 c->io_eof_reached = 1;
231 if (
c->inner_io_error < 0)
232 c->io_error =
c->inner_io_error;
233 }
234
237 }
238
240 }
241
243 {
247
249
252 goto fifo_fail;
253
254 /* wrap interrupt callback */
255 c->interrupt_callback =
h->interrupt_callback;
259 goto url_fail;
260 }
261
263 h->is_streamed =
c->inner->is_streamed;
264
269 goto mutex_fail;
270 }
271
276 goto cond_wakeup_main_fail;
277 }
278
283 goto cond_wakeup_background_fail;
284 }
285
290 goto thread_fail;
291 }
292
293 return 0;
294
295 thread_fail:
297 cond_wakeup_background_fail:
299 cond_wakeup_main_fail:
301 mutex_fail:
303 url_fail:
305 fifo_fail:
307 }
308
310 {
313
315 c->abort_request = 1;
318
322
328
329 return 0;
330 }
331
333 {
336 int read_complete = !dest;
339
341
342 while (to_read > 0) {
343 int fifo_size, to_copy;
346 break;
347 }
349 to_copy =
FFMIN(to_read, fifo_size);
350 if (to_copy > 0) {
352 if (dest)
353 dest = (uint8_t *)dest + to_copy;
354 c->logical_pos += to_copy;
355 to_read -= to_copy;
357
358 if (to_read <= 0 || !read_complete)
359 break;
360 }
else if (
c->io_eof_reached) {
364 else
366 }
367 break;
368 }
371 }
372
375
377 }
378
380 {
382 }
383
385 {
389 int64_t new_logical_pos;
390 int fifo_size;
391 int fifo_size_of_read_back;
392
395 return c->logical_size;
396 } else if (whence == SEEK_CUR) {
398 new_logical_pos =
pos +
c->logical_pos;
399 } else if (whence == SEEK_SET){
401 new_logical_pos =
pos;
402 } else {
404 }
405 if (new_logical_pos < 0)
407
410 if (new_logical_pos ==
c->logical_pos) {
411 /* current position */
412 return c->logical_pos;
413 }
else if ((new_logical_pos >= (
c->logical_pos - fifo_size_of_read_back)) &&
415 int pos_delta = (
int)(new_logical_pos -
c->logical_pos);
416 /* fast seek */
418 new_logical_pos, (
int)
c->logical_pos,
419 (
int)(new_logical_pos -
c->logical_pos), fifo_size);
420
421 if (pos_delta > 0) {
422 // fast seek forwards
424 } else {
425 // fast seek backwards
427 c->logical_pos = new_logical_pos;
428 }
429
430 return c->logical_pos;
431 }
else if (
c->logical_size <= 0) {
432 /* can not seek */
434 }
else if (new_logical_pos >
c->logical_size) {
435 /* beyond end */
437 }
438
440
442 c->seek_pos = new_logical_pos;
443 c->seek_whence = SEEK_SET;
444 c->seek_completed = 0;
446
447 while (1) {
450 break;
451 }
452 if (
c->seek_completed) {
453 if (
c->seek_ret >= 0)
454 c->logical_pos =
c->seek_ret;
456 break;
457 }
460 }
461
463
465 }
466
467 #define OFFSET(x) offsetof(Context, x)
468 #define D AV_OPT_FLAG_DECODING_PARAM
469
472 };
473
474 #undef D
475 #undef OFFSET
476
482 };
483
490 .priv_data_size =
sizeof(
Context),
492 };
493
494 #if 0
495
496 #define TEST_SEEK_POS (1536)
497 #define TEST_STREAM_SIZE (2048)
498
501 int64_t logical_pos;
502 int64_t logical_size;
503
504 /* options */
505 int opt_read_error;
507
509 {
512 c->logical_size = TEST_STREAM_SIZE;
513 return 0;
514 }
515
517 {
518 return 0;
519 }
520
521 static int async_test_read(
URLContext *
h,
unsigned char *buf,
int size)
522 {
525 int read_len = 0;
526
527 if (
c->opt_read_error)
528 return c->opt_read_error;
529
530 if (
c->logical_pos >=
c->logical_size)
532
534 buf[
i] =
c->logical_pos & 0xFF;
535
537 read_len++;
538
539 if (
c->logical_pos >=
c->logical_size)
540 break;
541 }
542
543 return read_len;
544 }
545
546 static int64_t async_test_seek(
URLContext *
h, int64_t
pos,
int whence)
547 {
549 int64_t new_logical_pos;
550
552 return c->logical_size;
553 } else if (whence == SEEK_CUR) {
554 new_logical_pos =
pos +
c->logical_pos;
555 } else if (whence == SEEK_SET){
556 new_logical_pos =
pos;
557 } else {
559 }
560 if (new_logical_pos < 0)
562
563 c->logical_pos = new_logical_pos;
564 return new_logical_pos;
565 }
566
567 #define OFFSET(x) offsetof(TestContext, x)
568 #define D AV_OPT_FLAG_DECODING_PARAM
569
570 static const AVOption async_test_options[] = {
571 { "async-test-read-error", "cause read fail",
574 };
575
576 #undef D
577 #undef OFFSET
578
579 static const AVClass async_test_context_class = {
582 .option = async_test_options,
584 };
585
587 .
name =
"async-test",
588 .url_open2 = async_test_open,
589 .url_read = async_test_read,
590 .url_seek = async_test_seek,
591 .url_close = async_test_close,
593 .priv_data_class = &async_test_context_class,
594 };
595
597 {
603 int64_t read_len;
604 unsigned char buf[4096];
606
608 ffurl_register_protocol(&ff_async_test_protocol);
609
610 /*
611 * test normal read
612 */
616
619
621 read_len = 0;
622 while (1) {
626 break;
627 }
629 break;
633 } else {
634 for (
i = 0;
i <
ret; ++
i) {
635 if (buf[
i] != (
pos & 0xFF)) {
636 printf(
"read-mismatch: actual %d, expecting %d, at %"PRId64
"\n",
637 (
int)buf[
i], (
int)(
pos & 0xFF),
pos);
638 break;
639 }
641 }
642 }
643
645 }
646 printf(
"read: %"PRId64
"\n", read_len);
647
648 /*
649 * test normal seek
650 */
653
656
657 read_len = 0;
658 while (1) {
661 break;
663 break;
667 } else {
668 for (
i = 0;
i <
ret; ++
i) {
669 if (buf[
i] != (
pos & 0xFF)) {
670 printf(
"read-mismatch: actual %d, expecting %d, at %"PRId64
"\n",
671 (
int)buf[
i], (
int)(
pos & 0xFF),
pos);
672 break;
673 }
675 }
676 }
677
679 }
680 printf(
"read: %"PRId64
"\n", read_len);
681
684
685 /*
686 * test read error
687 */
693
696
700 return 0;
701 }
702
703 #endif