1 /*
2 * Input async protocol.
3 * Copyright (c) 2015 Zhang Rui <bbcallen@gmail.com>
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 *
21 * Based on libavformat/cache.c by Michael Niedermayer
22 */
23
24 /**
25 * @TODO
26 * support timeout
27 * support work with concatdec, hls
28 */
29
38 #include <stdint.h>
39
40 #if HAVE_UNISTD_H
41 #include <unistd.h>
42 #endif
43
44 #define BUFFER_CAPACITY (4 * 1024 * 1024)
45 #define READ_BACK_CAPACITY (4 * 1024 * 1024)
46 #define SHORT_SEEK_THRESHOLD (256 * 1024)
47
49 {
52
55
59
65
69
73
78
82
84 {
89
91 return 0;
92 }
93
95 {
97 }
98
100 {
103 }
104
106 {
108 }
109
111 {
113 }
114
116 {
117 int ret;
118
122
126 }
127
128 return ret;
129 }
130
132 {
135 }
136
138 {
140 }
141
143 {
147 return 0;
148 }
149
151 {
154
156 return 1;
157
160
162 }
163
165 {
168 int ret;
169
172
173 return ret;
174 }
175
177 {
181 int ret = 0;
182 int64_t seek_ret;
183
184 while (1) {
185 int fifo_space, to_copy;
186
193 break;
194 }
195
198 if (seek_ret >= 0) {
202 }
203
207
208
211 continue;
212 }
213
219 continue;
220 }
222
223 to_copy =
FFMIN(4096, fifo_space);
225
227 if (ret <= 0) {
231 }
232
235 }
236
238 }
239
241 {
243 int ret;
245
247
249 if (ret < 0)
250 goto fifo_fail;
251
252 /* wrap interrupt callback */
255 if (ret != 0) {
257 goto url_fail;
258 }
259
262
264 if (ret != 0) {
266 goto mutex_fail;
267 }
268
270 if (ret != 0) {
272 goto cond_wakeup_main_fail;
273 }
274
276 if (ret != 0) {
278 goto cond_wakeup_background_fail;
279 }
280
282 if (ret) {
284 goto thread_fail;
285 }
286
287 return 0;
288
289 thread_fail:
291 cond_wakeup_background_fail:
293 cond_wakeup_main_fail:
295 mutex_fail:
297 url_fail:
299 fifo_fail:
300 return ret;
301 }
302
304 {
306 int ret;
307
312
314 if (ret != 0)
316
322
323 return 0;
324 }
325
327 void (*
func)(
void*,
void*,
int))
328 {
332 int ret = 0;
333
335
336 while (to_read > 0) {
337 int fifo_size, to_copy;
340 break;
341 }
343 to_copy =
FFMIN(to_read, fifo_size);
344 if (to_copy > 0) {
347 dest = (
uint8_t *)dest + to_copy;
349 to_read -= to_copy;
350 ret = size - to_read;
351
352 if (to_read <= 0 || !read_complete)
353 break;
355 if (ret <= 0) {
358 else
360 }
361 break;
362 }
365 }
366
369
370 return ret;
371 }
372
374 {
376 }
377
379 // do not copy
380 }
381
383 {
386 int64_t ret;
387 int64_t new_logical_pos;
388 int fifo_size;
389 int fifo_size_of_read_back;
390
394 } else if (whence == SEEK_CUR) {
397 } else if (whence == SEEK_SET){
399 new_logical_pos = pos;
400 } else {
402 }
403 if (new_logical_pos < 0)
405
409 /* current position */
411 }
else if ((new_logical_pos >= (c->
logical_pos - fifo_size_of_read_back)) &&
413 int pos_delta = (int)(new_logical_pos - c->
logical_pos);
414 /* fast seek */
417 (
int)(new_logical_pos - c->
logical_pos), fifo_size);
418
419 if (pos_delta > 0) {
420 // fast seek forwards
422 } else {
423 // fast seek backwards
426 }
427
430 /* can not seek */
433 /* beyond end */
435 }
436
438
444
445 while (1) {
448 break;
449 }
454 break;
455 }
458 }
459
461
462 return ret;
463 }
464
465 #define OFFSET(x) offsetof(Context, x)
466 #define D AV_OPT_FLAG_DECODING_PARAM
467
470 };
471
472 #undef D
473 #undef OFFSET
474
480 };
481
488 .priv_data_size =
sizeof(
Context),
489 .priv_data_class = &async_context_class,
490 };
491
492 #ifdef TEST
493
494 #define TEST_SEEK_POS (1536)
495 #define TEST_STREAM_SIZE (2048)
496
497 typedef struct TestContext {
499 int64_t logical_pos;
500 int64_t logical_size;
501
502 /* options */
503 int opt_read_error;
504 } TestContext;
505
507 {
509 c->logical_pos = 0;
510 c->logical_size = TEST_STREAM_SIZE;
511 return 0;
512 }
513
515 {
516 return 0;
517 }
518
519 static int async_test_read(
URLContext *h,
unsigned char *
buf,
int size)
520 {
522 int i;
523 int read_len = 0;
524
525 if (c->opt_read_error)
526 return c->opt_read_error;
527
528 if (c->logical_pos >= c->logical_size)
530
531 for (i = 0; i <
size; ++i) {
532 buf[i] = c->logical_pos & 0xFF;
533
534 c->logical_pos++;
535 read_len++;
536
537 if (c->logical_pos >= c->logical_size)
538 break;
539 }
540
541 return read_len;
542 }
543
544 static int64_t async_test_seek(
URLContext *h, int64_t pos,
int whence)
545 {
547 int64_t new_logical_pos;
548
550 return c->logical_size;
551 } else if (whence == SEEK_CUR) {
552 new_logical_pos = pos + c->logical_pos;
553 } else if (whence == SEEK_SET){
554 new_logical_pos = pos;
555 } else {
557 }
558 if (new_logical_pos < 0)
560
561 c->logical_pos = new_logical_pos;
562 return new_logical_pos;
563 }
564
565 #define OFFSET(x) offsetof(TestContext, x)
566 #define D AV_OPT_FLAG_DECODING_PARAM
567
568 static const AVOption async_test_options[] = {
569 { "async-test-read-error", "cause read fail",
572 };
573
574 #undef D
575 #undef OFFSET
576
577 static const AVClass async_test_context_class = {
580 .option = async_test_options,
582 };
583
585 .
name =
"async-test",
586 .url_open2 = async_test_open,
587 .url_read = async_test_read,
588 .url_seek = async_test_seek,
589 .url_close = async_test_close,
590 .priv_data_size = sizeof(TestContext),
591 .priv_data_class = &async_test_context_class,
592 };
593
595 {
597 int i;
598 int ret;
600 int64_t pos;
601 int64_t read_len;
602 unsigned char buf[4096];
604
607
608 /*
609 * test normal read
610 */
612 printf("open: %d\n", ret);
613
615 printf("size: %"PRId64"\n", size);
616
618 read_len = 0;
619 while (1) {
622 printf(
"read-error: AVERROR_EOF at %"PRId64
"\n",
ffurl_seek(h, 0, SEEK_CUR));
623 break;
624 }
625 else if (ret == 0)
626 break;
627 else if (ret < 0) {
628 printf(
"read-error: %d at %"PRId64
"\n", ret,
ffurl_seek(h, 0, SEEK_CUR));
630 } else {
631 for (i = 0; i < ret; ++i) {
632 if (buf[i] != (pos & 0xFF)) {
633 printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
634 (int)buf[i], (int)(pos & 0xFF), pos);
635 break;
636 }
637 pos++;
638 }
639 }
640
641 read_len += ret;
642 }
643 printf("read: %"PRId64"\n", read_len);
644
645 /*
646 * test normal seek
647 */
649 printf("read: %d\n", ret);
650
652 printf("seek: %"PRId64"\n", pos);
653
654 read_len = 0;
655 while (1) {
658 break;
659 else if (ret == 0)
660 break;
661 else if (ret < 0) {
662 printf(
"read-error: %d at %"PRId64
"\n", ret,
ffurl_seek(h, 0, SEEK_CUR));
664 } else {
665 for (i = 0; i < ret; ++i) {
666 if (buf[i] != (pos & 0xFF)) {
667 printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
668 (int)buf[i], (int)(pos & 0xFF), pos);
669 break;
670 }
671 pos++;
672 }
673 }
674
675 read_len += ret;
676 }
677 printf("read: %"PRId64"\n", read_len);
678
680 printf("read: %d\n", ret);
681
682 /*
683 * test read error
684 */
688 printf("open: %d\n", ret);
689
691 printf("read: %d\n", ret);
692
696 return 0;
697 }
698
699 #endif
static int wrapped_url_read(void *src, void *dst, int size)
#define READ_BACK_CAPACITY
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
static int async_check_interrupt(void *arg)
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
#define LIBAVUTIL_VERSION_INT
static int async_read_internal(URLContext *h, void *dest, int size, int read_complete, void(*func)(void *, void *, int))
int is_streamed
true if streamed (no seek possible), default = false
AVIOInterruptCB interrupt_callback
#define AVIO_FLAG_READ
read-only
static const AVClass async_context_class
static int ring_size(RingBuffer *ring)
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
static int ring_space(RingBuffer *ring)
static const AVOption options[]
int av_fifo_generic_write(AVFifoBuffer *f, void *src, int size, int(*func)(void *, void *, int))
Feed data from a user-supplied callback to an AVFifoBuffer.
URLProtocol ff_async_protocol
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
static int ring_drain(RingBuffer *ring, int offset)
static int ring_generic_write(RingBuffer *ring, void *src, int size, int(*func)(void *, void *, int))
#define av_assert2(cond)
assert() equivalent, that does lie in speed critical code.
#define AV_LOG_TRACE
Extremely verbose debugging, useful for libav* development.
int av_fifo_space(const AVFifoBuffer *f)
Return the amount of space in bytes in the AVFifoBuffer, that is the amount of data you can write int...
#define SHORT_SEEK_THRESHOLD
static int ring_generic_read(RingBuffer *ring, void *dest, int buf_size, void(*func)(void *, void *, int))
#define AVERROR_EOF
End of file.
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
static void * async_buffer_task(void *arg)
Callback for checking whether to abort blocking functions.
static int64_t async_seek(URLContext *h, int64_t pos, int whence)
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
const char * protocol_whitelist
void av_dict_free(AVDictionary **pm)
Free all the memory allocated for an AVDictionary struct and all keys and values. ...
simple assert() macros that are a bit more flexible than ISO C assert().
static void fifo_do_not_copy_func(void *dest, void *src, int size)
static const uint8_t offset[127][2]
static int async_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
AVIOInterruptCB interrupt_callback
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
int ffurl_open_whitelist(URLContext **puc, const char *filename, int flags, const AVIOInterruptCB *int_cb, AVDictionary **options, const char *whitelist)
Create an URLContext for accessing to the resource indicated by url, and open it. ...
int ff_check_interrupt(AVIOInterruptCB *cb)
Check if the user has requested to interrup a blocking function associated with cb.
int av_fifo_size(const AVFifoBuffer *f)
Return the amount of data in bytes in the AVFifoBuffer, that is the amount of data you can read from ...
int av_fifo_generic_peek_at(AVFifoBuffer *f, void *dest, int offset, int buf_size, void(*func)(void *, void *, int))
Feed data at specific position from an AVFifoBuffer to a user-supplied callback.
pthread_t async_buffer_thread
int64_t ffurl_size(URLContext *h)
Return the filesize of the resource accessed by h, AVERROR(ENOSYS) if the operation is not supported ...
a very simple circular buffer FIFO implementation
Describe the class of an AVClass context structure.
int(* func)(AVBPrint *dst, const char *in, const char *arg)
static void ring_destroy(RingBuffer *ring)
static int ring_size_of_read_back(RingBuffer *ring)
static int ring_init(RingBuffer *ring, unsigned int capacity, int read_back_capacity)
int ffurl_close(URLContext *h)
static void ring_reset(RingBuffer *ring)
int av_strstart(const char *str, const char *pfx, const char **ptr)
Return non-zero if pfx is a prefix of str.
int ffurl_register_protocol(URLProtocol *protocol)
Register the URLProtocol protocol.
int64_t ffurl_seek(URLContext *h, int64_t pos, int whence)
Change the position that will be used by the next read/write operation on the resource accessed by h...
int ffurl_open(URLContext **puc, const char *filename, int flags, const AVIOInterruptCB *int_cb, AVDictionary **options)
int av_dict_set_int(AVDictionary **pm, const char *key, int64_t value, int flags)
Convenience wrapper for av_dict_set that converts the value to a string and stores it...
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
#define AVSEEK_SIZE
Passing this as the "whence" parameter to a seek function causes it to return the filesize without se...
pthread_cond_t cond_wakeup_main
AVFifoBuffer * av_fifo_alloc(unsigned int size)
Initialize an AVFifoBuffer.
static int async_close(URLContext *h)
static av_always_inline int pthread_mutex_unlock(pthread_mutex_t *mutex)
#define BUFFER_CAPACITY
support timeout support work with concatdec, hls
unbuffered private I/O API
void av_fifo_freep(AVFifoBuffer **f)
Free an AVFifoBuffer and reset pointer to NULL.
static av_always_inline int pthread_mutex_lock(pthread_mutex_t *mutex)
int main(int argc, char **argv)
void av_fifo_reset(AVFifoBuffer *f)
Reset the AVFifoBuffer to the state right after av_fifo_alloc, in particular it is emptied...
void av_fifo_drain(AVFifoBuffer *f, int size)
Discard data from the FIFO.
static int async_read(URLContext *h, unsigned char *buf, int size)
int ffurl_read(URLContext *h, unsigned char *buf, int size)
Read up to size bytes from the resource accessed by h, and store the read bytes in buf...
pthread_cond_t cond_wakeup_background