1 /*
2 * Copyright (c) 2013 Stefano Sabatini
3 *
4 * This file is part of FFmpeg.
5 *
6 * FFmpeg is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
10 *
11 * FFmpeg is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with FFmpeg; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
21 /**
22 * @file
23 * receive commands through libzeromq and broker them to filters
24 */
25
26 #include <zmq.h>
35
43
44 #define OFFSET(x) offsetof(ZMQContext, x)
45 #define FLAGS AV_OPT_FLAG_FILTERING_PARAM | AV_OPT_FLAG_AUDIO_PARAM | AV_OPT_FLAG_VIDEO_PARAM
49 { NULL }
50 };
51
53 {
55
56 zmq->
zmq = zmq_ctx_new();
59 "Could not create ZMQ context: %s\n", zmq_strerror(errno));
61 }
62
66 "Could not create ZMQ socket: %s\n", zmq_strerror(errno));
68 }
69
72 "Could not bind ZMQ socket to address '%s': %s\n",
75 }
76
78 return 0;
79 }
80
82 {
84
86 zmq_ctx_destroy(zmq->
zmq);
87 }
88
89 typedef struct {
92
93 #define SPACES " \f\t\n\r"
94
96 {
97 const char **
buf = &command_str;
98
102 "No target specified in command '%s'\n", command_str);
104 }
105
109 "No command specified in command '%s'\n", command_str);
111 }
112
114 return 0;
115 }
116
118 {
120 zmq_msg_t msg;
122
123 if (zmq_msg_init(&msg) == -1) {
125 "Could not initialize receive message: %s\n", zmq_strerror(errno));
127 }
128
129 if (zmq_msg_recv(&msg, zmq->
responder, ZMQ_DONTWAIT) == -1) {
130 if (errno != EAGAIN)
132 "Could not receive message: %s\n", zmq_strerror(errno));
135 }
136
137 *buf_size = zmq_msg_size(&msg) + 1;
139 if (!*buf) {
142 }
143 memcpy(*buf, zmq_msg_data(&msg), *buf_size);
144 (*buf)[*buf_size-1] = 0;
145
147 zmq_msg_close(&msg);
149 }
150
152 {
155
156 while (1) {
157 char cmd_buf[1024];
158 char *recv_buf, *send_buf;
159 int recv_buf_size;
162
163 /* receive command */
164 if (
recv_msg(ctx, &recv_buf, &recv_buf_size) < 0)
165 break;
167
168 /* parse command */
172 }
173
174 /* process command */
176 "Processing command #%d target:%s command:%s arg:%s\n",
180 cmd_buf, sizeof(cmd_buf),
183 -ret,
av_err2str(ret), cmd_buf[0] ?
"\n" :
"", cmd_buf);
184 if (!send_buf) {
187 }
189 "Sending command reply for command #%d:\n%s\n",
191 if (zmq_send(zmq->
responder, send_buf, strlen(send_buf), 0) == -1)
194
198 recv_buf_size = 0;
202 }
203
205 }
206
207 #if CONFIG_ZMQ_FILTER
208
209 #define zmq_options options
211
213 {
217 },
218 { NULL }
219 };
220
222 {
225 },
226 { NULL }
227 };
228
231 .description =
NULL_IF_CONFIG_SMALL(
"Receive commands through ZMQ and broker them to filters."),
237 .priv_class = &zmq_class,
238 };
239
240 #endif
241
242 #if CONFIG_AZMQ_FILTER
243
244 #define azmq_options options
246
248 {
253 },
254 { NULL }
255 };
256
258 {
261 },
262 { NULL }
263 };
264
267 .description =
NULL_IF_CONFIG_SMALL(
"Receive commands through ZMQ and broker them to filters."),
273 .priv_class = &azmq_class,
274 };
275
276 #endif