1 /*
2 * Advanced Message Queuing Protocol (AMQP) 0-9-1
3 * Copyright (c) 2020 Andriy Gelman
4 *
5 * This file is part of FFmpeg.
6 *
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
11 *
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
22 #include <amqp.h>
23 #include <amqp_tcp_socket.h>
24 #include <sys/time.h>
32
35 amqp_connection_state_t
conn;
44
46 #define DEFAULT_CHANNEL 1
47
48 #define OFFSET(x) offsetof(AMQPContext, x)
49 #define D AV_OPT_FLAG_DECODING_PARAM
50 #define E AV_OPT_FLAG_ENCODING_PARAM
52 {
"pkt_size",
"Maximum send/read packet size",
OFFSET(pkt_size),
AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags =
D |
E },
55 {
"connection_timeout",
"Initial connection timeout",
OFFSET(connection_timeout),
AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT64_MAX, .flags =
D |
E},
56 {
"delivery_mode",
"Delivery mode",
OFFSET(delivery_mode),
AV_OPT_TYPE_INT, { .i64 = AMQP_DELIVERY_PERSISTENT }, 1, 2, .flags =
E,
"delivery_mode"},
57 {
"persistent",
"Persistent delivery mode", 0,
AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_PERSISTENT }, 0, 0,
E,
"delivery_mode" },
58 {
"non-persistent",
"Non-persistent delivery mode", 0,
AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_NONPERSISTENT }, 0, 0,
E,
"delivery_mode" },
60 };
61
63 {
66 int port;
67 const char *user, *password =
NULL, *vhost;
68 const char *user_decoded, *password_decoded, *vhost_decoded;
69 char *p;
70 amqp_rpc_reply_t broker_reply;
71 struct timeval tval = { 0 };
72
74
76 h->max_packet_size =
s->pkt_size;
77
79 hostname, sizeof(hostname), &port, path, sizeof(path), uri);
80
81 if (port < 0)
82 port = 5672;
83
84 if (hostname[0] == '0円' || port <= 0 || port > 65535 ) {
87 }
88
89 p = strchr(credentials, ':');
90 if (p) {
91 *p = '0円';
92 password = p + 1;
93 }
94
95 if (!password || *password == '0円')
96 password = "guest";
97
99 if (!password_decoded)
101
102 user = credentials;
103 if (*user == '0円')
104 user = "guest";
105
107 if (!user_decoded) {
110 }
111
112 /* skip query for now */
113 p = strchr(path, '?');
114 if (p)
115 *p = '0円';
116
117 vhost = path;
118 if (*vhost == '0円')
119 vhost = "/";
120 else
121 vhost++; /* skip leading '/' */
122
124 if (!vhost_decoded) {
128 }
129
130 s->conn = amqp_new_connection();
137 }
138
139 s->socket = amqp_tcp_socket_new(
s->conn);
142 goto destroy_connection;
143 }
144
145 if (
s->connection_timeout < 0)
146 s->connection_timeout = (
h->rw_timeout > 0 ?
h->rw_timeout : 5000000);
147
148 tval.tv_sec =
s->connection_timeout / 1000000;
149 tval.tv_usec =
s->connection_timeout % 1000000;
150 ret = amqp_socket_open_noblock(
s->socket, hostname, port, &tval);
151
154 amqp_error_string2(
ret));
155 goto destroy_connection;
156 }
157
158 broker_reply = amqp_login(
s->conn, vhost_decoded, 0,
s->pkt_size, 0,
159 AMQP_SASL_METHOD_PLAIN, user_decoded, password_decoded);
160
161 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
163 server_msg = AMQP_ACCESS_REFUSED;
164 goto close_connection;
165 }
166
168 broker_reply = amqp_get_rpc_reply(
s->conn);
169
170 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
172 server_msg = AMQP_CHANNEL_ERROR;
173 goto close_connection;
174 }
175
177 amqp_bytes_t queuename;
179 amqp_queue_declare_ok_t *
r;
180
182 0, 0, 0, 1, amqp_empty_table);
183 broker_reply = amqp_get_rpc_reply(
s->conn);
184 if (!
r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
186 server_msg = AMQP_RESOURCE_ERROR;
187 goto close_channel;
188 }
189
190 /* store queuename */
191 queuename.bytes = queuename_buff;
193 memcpy(queuename.bytes,
r->queue.bytes, queuename.len);
194
196 amqp_cstring_bytes(
s->exchange),
197 amqp_cstring_bytes(
s->routing_key), amqp_empty_table);
198
199 broker_reply = amqp_get_rpc_reply(
s->conn);
200 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
202 server_msg = AMQP_INTERNAL_ERROR;
203 goto close_channel;
204 }
205
207 0, 1, 0, amqp_empty_table);
208
209 broker_reply = amqp_get_rpc_reply(
s->conn);
210 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
212 server_msg = AMQP_INTERNAL_ERROR;
213 goto close_channel;
214 }
215 }
216
220 return 0;
221
222 close_channel:
224 close_connection:
225 amqp_connection_close(
s->conn, server_msg);
226 destroy_connection:
227 amqp_destroy_connection(
s->conn);
228
233 }
234
236 {
239 int fd = amqp_socket_get_sockfd(
s->socket);
240
242 amqp_basic_properties_t props;
243
247
248 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
249 props.content_type = amqp_cstring_bytes("octet/stream");
250 props.delivery_mode =
s->delivery_mode;
251
253 amqp_cstring_bytes(
s->routing_key), 0, 0,
255
259 }
260
262 }
263
265 {
267 int fd = amqp_socket_get_sockfd(
s->socket);
269
270 amqp_rpc_reply_t broker_reply;
272
276
277 amqp_maybe_release_buffers(
s->conn);
278 broker_reply = amqp_consume_message(
s->conn, &
envelope,
NULL, 0);
279
280 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL)
282
284 s->pkt_size_overflow =
FFMAX(
s->pkt_size_overflow,
envelope.message.body.len);
286 "Message will be truncated. Setting -pkt_size %d "
287 "may resolve this issue.\n",
s->pkt_size_overflow);
288 }
290
293
295 }
296
298 {
301 amqp_connection_close(
s->conn, AMQP_REPLY_SUCCESS);
302 amqp_destroy_connection(
s->conn);
303
304 return 0;
305 }
306
312 };
313
323 };