1 /*
2 * ZeroMQ Protocol
3 * Copyright (c) 2019 Andriy Gelman
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <zmq.h>
28
29 #define ZMQ_STRERROR zmq_strerror(zmq_errno())
30
38
39 #define OFFSET(x) offsetof(ZMQContext, x)
40 #define D AV_OPT_FLAG_DECODING_PARAM
41 #define E AV_OPT_FLAG_ENCODING_PARAM
43 {
"pkt_size",
"Maximum send/read packet size",
OFFSET(pkt_size),
AV_OPT_TYPE_INT, { .i64 = 131072 }, -1, INT_MAX, .flags =
D |
E },
45 };
46
48 {
50 int ev = write ? ZMQ_POLLOUT : ZMQ_POLLIN;
51 zmq_pollitem_t items = { .socket = socket, .fd = 0, .events = ev, .revents = 0 };
56 }
57 return items.revents & ev ? 0 :
AVERROR(EAGAIN);
58 }
59
61 {
64
65 while (1) {
71 if (timeout > 0) {
72 if (!wait_start)
76 }
77 }
78 }
79
81 {
84 s->pkt_size_overflow = 0;
86
88 h->max_packet_size =
s->pkt_size;
89
90 s->context = zmq_ctx_new();
92 /*errno not set on failure during zmq_ctx_new()*/
95 }
96
100 }
101
102 /*publish during write*/
104 s->socket = zmq_socket(
s->context, ZMQ_PUB);
107 goto fail_term;
108 }
109
110 ret = zmq_bind(
s->socket, uri);
113 goto fail_close;
114 }
115 }
116
117 /*subscribe for read*/
119 s->socket = zmq_socket(
s->context, ZMQ_SUB);
122 goto fail_term;
123 }
124
125 ret = zmq_setsockopt(
s->socket, ZMQ_SUBSCRIBE,
"", 0);
128 goto fail_close;
129 }
130
131 ret = zmq_connect(
s->socket, uri);
134 goto fail_close;
135 }
136 }
137 return 0;
138
139 fail_close:
140 zmq_close(
s->socket);
141 fail_term:
142 zmq_ctx_term(
s->context);
144 }
145
147 {
150
154 ret = zmq_send(
s->socket, buf,
size, 0);
158 }
159 return ret;
/*number of bytes sent*/
160 }
161
163 {
166
170 ret = zmq_recv(
s->socket, buf,
size, 0);
174 }
176 s->pkt_size_overflow =
FFMAX(
s->pkt_size_overflow,
ret);
177 av_log(
h,
AV_LOG_WARNING,
"Message exceeds available space in the buffer. Message will be truncated. Setting -pkt_size %d may resolve the issue.\n",
s->pkt_size_overflow);
179 }
180 return ret;
/*number of bytes read*/
181 }
182
184 {
186 zmq_close(
s->socket);
187 zmq_ctx_term(
s->context);
188 return 0;
189 }
190
196 };
197
207 };