1 /*
2 * Advanced Message Queuing Protocol (AMQP) 0-9-1
3 * Copyright (c) 2020 Andriy Gelman
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <amqp.h>
23 #include <amqp_tcp_socket.h>
24 #include <sys/time.h>
31
34 amqp_connection_state_t
conn;
43
45 #define DEFAULT_CHANNEL 1
46
47 #define OFFSET(x) offsetof(AMQPContext, x)
48 #define D AV_OPT_FLAG_DECODING_PARAM
49 #define E AV_OPT_FLAG_ENCODING_PARAM
51 {
"pkt_size",
"Maximum send/read packet size",
OFFSET(pkt_size),
AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags =
D |
E },
54 {
"connection_timeout",
"Initial connection timeout",
OFFSET(connection_timeout),
AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT64_MAX, .flags =
D |
E},
55 {
"delivery_mode",
"Delivery mode",
OFFSET(delivery_mode),
AV_OPT_TYPE_INT, { .i64 = AMQP_DELIVERY_PERSISTENT }, 1, 2, .flags =
E, .unit =
"delivery_mode"},
56 {
"persistent",
"Persistent delivery mode", 0,
AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_PERSISTENT }, 0, 0,
E, .unit =
"delivery_mode" },
57 {
"non-persistent",
"Non-persistent delivery mode", 0,
AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_NONPERSISTENT }, 0, 0,
E, .unit =
"delivery_mode" },
59 };
60
62 {
65 int port;
66 const char *user, *password =
NULL, *vhost;
67 const char *user_decoded, *password_decoded, *vhost_decoded;
69 amqp_rpc_reply_t broker_reply;
70 struct timeval tval = { 0 };
71
73
75 h->max_packet_size =
s->pkt_size;
76
78 hostname, sizeof(hostname), &port, path, sizeof(path), uri);
79
80 if (port < 0)
81 port = 5672;
82
83 if (hostname[0] == '0円' || port <= 0 || port > 65535 ) {
86 }
87
88 p = strchr(credentials,
':');
92 }
93
94 if (!password || *password == '0円')
95 password = "guest";
96
98 if (!password_decoded)
100
101 user = credentials;
102 if (*user == '0円')
103 user = "guest";
104
106 if (!user_decoded) {
109 }
110
111 /* skip query for now */
112 p = strchr(path,
'?');
115
116 vhost = path;
117 if (*vhost == '0円')
118 vhost = "/";
119 else
120 vhost++; /* skip leading '/' */
121
123 if (!vhost_decoded) {
127 }
128
129 s->conn = amqp_new_connection();
136 }
137
138 s->socket = amqp_tcp_socket_new(
s->conn);
141 goto destroy_connection;
142 }
143
144 if (
s->connection_timeout < 0)
145 s->connection_timeout = (
h->rw_timeout > 0 ?
h->rw_timeout : 5000000);
146
147 tval.tv_sec =
s->connection_timeout / 1000000;
148 tval.tv_usec =
s->connection_timeout % 1000000;
149 ret = amqp_socket_open_noblock(
s->socket, hostname, port, &tval);
150
153 amqp_error_string2(
ret));
154 goto destroy_connection;
155 }
156
157 broker_reply = amqp_login(
s->conn, vhost_decoded, 0,
s->pkt_size, 0,
158 AMQP_SASL_METHOD_PLAIN, user_decoded, password_decoded);
159
160 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
162 server_msg = AMQP_ACCESS_REFUSED;
163 goto close_connection;
164 }
165
167 broker_reply = amqp_get_rpc_reply(
s->conn);
168
169 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
171 server_msg = AMQP_CHANNEL_ERROR;
172 goto close_connection;
173 }
174
176 amqp_bytes_t queuename;
178 amqp_queue_declare_ok_t *
r;
179
181 0, 0, 0, 1, amqp_empty_table);
182 broker_reply = amqp_get_rpc_reply(
s->conn);
183 if (!
r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
185 server_msg = AMQP_RESOURCE_ERROR;
186 goto close_channel;
187 }
188
189 /* store queuename */
190 queuename.bytes = queuename_buff;
192 memcpy(queuename.bytes,
r->queue.bytes, queuename.len);
193
195 amqp_cstring_bytes(
s->exchange),
196 amqp_cstring_bytes(
s->routing_key), amqp_empty_table);
197
198 broker_reply = amqp_get_rpc_reply(
s->conn);
199 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
201 server_msg = AMQP_INTERNAL_ERROR;
202 goto close_channel;
203 }
204
206 0, 1, 0, amqp_empty_table);
207
208 broker_reply = amqp_get_rpc_reply(
s->conn);
209 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
211 server_msg = AMQP_INTERNAL_ERROR;
212 goto close_channel;
213 }
214 }
215
219 return 0;
220
221 close_channel:
223 close_connection:
224 amqp_connection_close(
s->conn, server_msg);
225 destroy_connection:
226 amqp_destroy_connection(
s->conn);
227
232 }
233
235 {
238 int fd = amqp_socket_get_sockfd(
s->socket);
239
241 amqp_basic_properties_t props;
242
246
247 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
248 props.content_type = amqp_cstring_bytes("octet/stream");
249 props.delivery_mode =
s->delivery_mode;
250
252 amqp_cstring_bytes(
s->routing_key), 0, 0,
254
258 }
259
261 }
262
264 {
266 int fd = amqp_socket_get_sockfd(
s->socket);
268
269 amqp_rpc_reply_t broker_reply;
271
275
276 amqp_maybe_release_buffers(
s->conn);
277 broker_reply = amqp_consume_message(
s->conn, &
envelope,
NULL, 0);
278
279 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL)
281
283 s->pkt_size_overflow =
FFMAX(
s->pkt_size_overflow,
envelope.message.body.len);
285 "Message will be truncated. Setting -pkt_size %d "
286 "may resolve this issue.\n",
s->pkt_size_overflow);
287 }
289
292
294 }
295
297 {
300 amqp_connection_close(
s->conn, AMQP_REPLY_SUCCESS);
301 amqp_destroy_connection(
s->conn);
302
303 return 0;
304 }
305
311 };
312
322 };