I am trying to implement a network server application in C++ using Boost.Asio.
Here are the requirements I am trying to meet:
- The Application creates only one instance of
boost::io_context
. - Single
io_context
is beingrun()
by a shared Thread Pool. The number of threads is not defined. - Application can instantiate multiple Server objects. New Servers can be spawned and killed at any time.
- Each Server can handle connections from multiple clients.
I am trying to implement RAII pattern for the Server class. What I want to guarantee is that when Server gets deallocated all of its connections are completely closed. There are 3 ways each connection can be closed:
- Client responds and there is no more work to be done in a connection.
- Server is being deallocated and causes all alive connections to close.
- Connection is killed manually by invoking
stop()
method.
I have arrived to a solution that seems to meet all of the criteria above but since Boost.Asio is still quite new to me I wanted to verify that what I am doing is correct. Also there are couple of things that I was specifically not 100% sure about:
- I was trying to remove the
mutex
from the Server class and instead use astrand
for all of the synchronisation but I couldn't find a clear way to do it. - Because Thread Pool can consist of only 1 thread and this thread may be what's calling a Server destructor I had to invoke
io_context::poll_one()
from the destructor to give a chance for all of the pending connections to complete the shutdown and prevent a potential deadlock. - I would welcome any other suggestions for improvements you could think of.
Anyways, here's the code with some unit tests ( live version on Coliru: http://coliru.stacked-crooked.com/a/1afb0dc34dd09008 ):
#include <boost/asio/io_context.hpp>
#include <boost/asio/io_context_strand.hpp>
#include <boost/asio/executor.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/dispatch.hpp>
#include <iostream>
#include <string>
#include <vector>
#include <memory>
#include <list>
using namespace std;
using namespace boost::asio;
using namespace std::placeholders;
class Connection;
class ConnectionDelegate
{
public:
virtual ~ConnectionDelegate() { }
virtual class executor executor() const = 0;
virtual void didReceiveResponse(shared_ptr<Connection> connection) = 0;
};
class Connection: public enable_shared_from_this<Connection>
{
public:
Connection(string name, io_context& ioContext)
: _name(name)
, _ioContext(ioContext)
, _timer(ioContext)
{
}
const string& name() const
{
return _name;
}
void setDelegate(ConnectionDelegate *delegate)
{
_delegate = delegate;
}
void start()
{
// Simulate a network request
_timer.expires_from_now(boost::posix_time::seconds(3));
_timer.async_wait(bind(&Connection::handleResponse, shared_from_this(), _1));
}
void stop()
{
_timer.cancel();
}
private:
string _name;
io_context& _ioContext;
boost::asio::deadline_timer _timer;
ConnectionDelegate *_delegate;
void handleResponse(const boost::system::error_code& errorCode)
{
if (errorCode == error::operation_aborted)
{
return;
}
dispatch(_delegate->executor(),
bind(&ConnectionDelegate::didReceiveResponse, _delegate, shared_from_this()));
}
};
class Server: public ConnectionDelegate
{
public:
Server(string name, io_context& ioContext)
: _name(name)
, _ioContext(ioContext)
, _strand(_ioContext)
{
}
~Server()
{
stop();
assert(_connections.empty());
assert(_connectionIterators.empty());
}
weak_ptr<Connection> addConnection(string name)
{
auto connection = shared_ptr<Connection>(new Connection(name, _ioContext), bind(&Server::deleteConnection, this, _1));
{
lock_guard<mutex> lock(_mutex);
_connectionIterators[connection.get()] = _connections.insert(_connections.end(), connection);
}
connection->setDelegate(this);
connection->start();
return connection;
}
vector<shared_ptr<Connection>> connections()
{
lock_guard<mutex> lock(_mutex);
vector<shared_ptr<Connection>> connections;
for (auto weakConnection: _connections)
{
if (auto connection = weakConnection.lock())
{
connections.push_back(connection);
}
}
return connections;
}
void stop()
{
auto connectionsCount = 0;
for (auto connection: connections())
{
++connectionsCount;
connection->stop();
}
while (connectionsCount != 0)
{
_ioContext.poll_one();
connectionsCount = connections().size();
}
}
// MARK: - ConnectionDelegate
class executor executor() const override
{
return _strand;
}
void didReceiveResponse(shared_ptr<Connection> connection) override
{
// Strand to protect shared resourcess to be accessed by this method.
assert(_strand.running_in_this_thread());
// Here I plan to execute some business logic and I need both Server & Connection to be alive.
std::cout << "didReceiveResponse - server: " << _name << ", connection: " << connection->name() << endl;
}
private:
typedef list<weak_ptr<Connection>> ConnectionsList;
typedef unordered_map<Connection*, ConnectionsList::iterator> ConnectionsIteratorMap;
string _name;
io_context& _ioContext;
io_context::strand _strand;
ConnectionsList _connections;
ConnectionsIteratorMap _connectionIterators;
mutex _mutex;
void deleteConnection(Connection *connection)
{
{
lock_guard<mutex> lock(_mutex);
auto iterator = _connectionIterators[connection];
_connections.erase(iterator);
_connectionIterators.erase(connection);
}
default_delete<Connection>()(connection);
}
};
void testConnectionClosedByTheServer()
{
io_context ioContext;
auto server = make_unique<Server>("server1", ioContext);
auto weakConnection = server->addConnection("connection1");
assert(weakConnection.expired() == false);
assert(server->connections().size() == 1);
server.reset();
assert(weakConnection.expired() == true);
}
void testConnectionClosedAfterResponse()
{
io_context ioContext;
auto server = make_unique<Server>("server1", ioContext);
auto weakConnection = server->addConnection("connection1");
assert(weakConnection.expired() == false);
assert(server->connections().size() == 1);
while (!weakConnection.expired())
{
ioContext.poll_one();
}
assert(server->connections().size() == 0);
}
void testConnectionClosedManually()
{
io_context ioContext;
auto server = make_unique<Server>("server1", ioContext);
auto weakConnection = server->addConnection("connection1");
assert(weakConnection.expired() == false);
assert(server->connections().size() == 1);
weakConnection.lock()->stop();
ioContext.run();
assert(weakConnection.expired() == true);
assert(server->connections().size() == 0);
}
void testMultipleServers()
{
io_context ioContext;
auto server1 = make_unique<Server>("server1", ioContext);
auto server2 = make_unique<Server>("server2", ioContext);
auto weakConnection1 = server1->addConnection("connection1");
auto weakConnection2 = server2->addConnection("connection2");
server1.reset();
assert(weakConnection1.expired() == true);
assert(weakConnection2.expired() == false);
}
void testDeadLock()
{
io_context ioContext;
auto server = make_unique<Server>("server1", ioContext);
auto weakConnection = server->addConnection("connection1");
assert(weakConnection.expired() == false);
assert(server->connections().size() == 1);
auto connection = weakConnection.lock();
server.reset(); // <-- deadlock, but that's OK, i will try to prevent it by other means
}
int main()
{
testConnectionClosedByTheServer();
testConnectionClosedAfterResponse();
testConnectionClosedManually();
// testDeadLock();
}
Kind Regards, Marek
1 Answer 1
I don't know enough Asio to give the kind of feedback I know you want, but here are some minor cleanups you could do:
Don't
using namespace std
. You should probably avoidusing namespace
anything else, too, just for clarity.virtual ~ConnectionDelegate() { }
could bevirtual ~ConnectionDelegate() = default;
instead. This represents your intent a little better.~Server()
should be~Server() override
, to indicate that it overrides a virtual member function. In general, you should useoverride
wherever it's physically allowed by the language. (I think you do it right everywhere except on the destructors.)Connection(string name,
andServer(string name,
both unnecessarily makes a copy ofstring name
.All your constructors should be
explicit
, to tell the compiler that e.g. the braced pair{"hello world", myIOContext}
should not be implicitly treated as (or implicitly converted to) aServer
object, not even by accident.Personally, I find the use of typedefs for
ConnectionsList
andConnectionsIteratorMap
to be an unnecessary layer of indirection. I would rather seestd::list<std::weak_ptr<Connection>> _connections;
right there in-line. If I need a name for that type, I can just saydecltype(_connections)
.default_delete<Connection>()(connection)
is a verbose way of sayingdelete connection
. Be direct.class executor executor()
is hella confusing. The fact that you had to sayclass
there should have been a red flag that eitherexecutor
is not the right name for the class, orexecutor()
is not the right name for this method. Consider changing the name of the method toget_executor()
, for example. I assume that you can't change the name ofclass executor
because it isn't declared in this file; it must be coming from some Boost namespace that youusing
'ed, right? (Don'tusing
namespaces!)
You skip a lot of opportunities to avoid copies via references and/or move semantics. For example, in Server::connections()
, I would have written:
std::vector<std::shared_ptr<Connection>> connections() {
std::lock_guard<std::mutex> lock(_mutex);
std::vector<std::shared_ptr<Connection>> result;
for (const auto& weakConnection : _connections) {
if (auto sptr = weakConnection.lock()) {
result.push_back(std::move(sptr));
}
}
return result;
}
This avoids bumping the weak refcount by making weakConnection
a reference instead of a copy, and then avoids bumping the strong refcount by using move instead of copy in push_back
. Four atomic ops saved! (Not that this matters in real life, probably, but hey, welcome to Code Review.)
dispatch(_delegate->executor(),
bind(&ConnectionDelegate::didReceiveResponse, _delegate, shared_from_this()));
I find the use of bind
confusing, but I don't know for sure (and actually hope someone will comment and enlighten me) — is bind
needed here? It would certainly be clearer-to-read, faster-to-compile, and no-slower-at-runtime to write
dispatch(
_delegate->executor(),
[self = shared_from_this(), d = _delegate]() {
d->didReceiveResponse(self);
}
);
This would make it a bit clearer what's actually being copied (one shared_ptr
keeping *this
alive, and one raw pointer). In fact, I wonder whether we even need to stash the copy of the raw pointer; could we get away with this instead?
dispatch(
_delegate->executor(),
[self = shared_from_this()]() {
self->_delegate->didReceiveResponse(self);
}
);
Or do you expect that sometimes you'll get into the body of that lambda with d != self->_delegate
and that's why you need the extra pointer?
I also wonder whether it'd be possible to use std::chrono::seconds
instead of boost::posix_time::seconds
. Can Boost.Asio interoperate with C++11 std::chrono
these days?
_connectionIterators[connection.get()] = _connections.insert(_connections.end(), connection);
I feel like the "cleverness" here is on the wrong side of the equals sign. _connections.insert(_connections.end(), connection)
seems like a verbose way of writing _connections.push_back(connection)
. Vice versa, I'm used to seeing people replace map[k] = v
with map.emplace(k, v)
for performance and clarity. Remember that map[k] = v
first default-constructs map[k]
, and then assigns a new value into it.
Ah, I see, you need to use insert
because insert
returns an iterator and push_back
doesn't.
But that just raises the question: Why are you trying to shoehorn two side-effects into one line? If we're allowed two lines, we just do push_back
and then set map.emplace(connection.get(), std::prev(_connections.end()))
. Or, heck, at that point I wouldn't really complain about
auto it = _connections.insert(_connections.end(), connection);
_connectionIterators.emplace(connection.get(), it);
Having spotted the red flag, dig deeper: what's the difference between the one-liner and the clearer two-liner? Aha! The difference is what happens if _connections.insert(...)
runs out of memory and throws. With the two-liner, _connectionIterators
remains untouched. With the one-liner, you first default-construct some dangerous garbage in _connectionIterators[connection.get()]
and then propagate the exception.
So I think there's a reasonable argument to be made in favor of the two-liner, just on general principles.
Again, this answer doesn't really address your main concern about RAII, but I hope it gives some food for thought anyway.
-
\$\begingroup\$ Thank you very much for your response. You're right that I am mostly interested in someone reviewing my Asio usage and handling of the lifetime of the connections. Nevertheless I still find your comments valuable, especially the points about explicit constructors, default destructors and executor() method. I will keep this question open in case some Asio expert will come across it. \$\endgroup\$MarekR– MarekR2020年10月12日 20:36:13 +00:00Commented Oct 12, 2020 at 20:36