I think I made a mistake trying to start from the simplest version and working forward. This time I will present the final version, get all the fixes from here and incorperat them into the code, then I can reverse engineer the steps to get there.
The Parts of the server are:
This post deals only with Store.
This class is used to keep track of all state associated with open sockets.
I wanted to divide this class into two parts. Only the main thread can update the state. So all updates are done via processUpdateRequest()
. Any thread can add call requestChange(T&& update)
which queues a request to be handled by processUpdateRequest()
.
Store.h
#ifndef THORSANVIL_NISSE_STORE_H
#define THORSANVIL_NISSE_STORE_H
#include "NisseConfig.h"
#include "NisseUtil.h"
#include "Pynt.h"
#include "EventHandlerLibEvent.h"
#include <ThorsSocket/Server.h>
#include <ThorsSocket/SocketStream.h>
#include <variant>
#include <functional>
#include <map>
#include <vector>
#include <mutex>
/*
* The store can hold two types of object:
* ServerData: Data needed by the listeners.
* The ThorsSocket::Server object.
* StreamData: Data needed by a connected socket that is communicating.
* The ThorsSocket::SocketStream object.
*
* Addition both types hold the following information.
* 2: Task A lambda that is executed.
* 3: CoRoutine A boost CoRoutine2 that allows code to yield if the socket blocks.
* 4: LibEvent objects These interact with the EventHandler to make sure that
* callbacks activate the Co-Routine.
*
* When an object is first created we also active the LibEvent read Listener of that object.
*
* To make sure that this object is thread safe state change is only done by the master thread
* via the method `processUpdateRequest()`. All other threads request state change
* via the `requestChange()` method that enqueues a request to be done by the main thread.
*/
namespace ThorsAnvil::Nisse
{
/*
* Data that can be stored.
*/
struct ServerData
{
TAS::Server server;
CoRoutine coRoutine;
Event readEvent;
Pynt* pynt;
};
struct StreamData
{
TAS::SocketStream stream;
CoRoutine coRoutine;
Event readEvent;
Event writeEvent;
Pynt* pynt;
};
using StoreData = std::variant<ServerData, StreamData>;
/*
* Change request objects.
* The following are objects that can be enqueued by requestChange()
*/
struct StateUpdateCreateServer
{
int fd;
TAS::Server server;
ServerCreator coRoutineCreator;
Event readEvent;
Pynt& pynt;
};
struct StateUpdateCreateStream
{
int fd;
TAS::SocketStream stream;
StreamCreator coRoutineCreator;
Event readEvent;
Event writeEvent;
Pynt& pynt;
};
struct StateUpdateRemove
{
int fd;
};
struct StateUpdateRestoreRead
{
int fd;
};
struct StateUpdateRestoreWrite
{
int fd;
};
using StateUpdate = std::variant<StateUpdateCreateServer, StateUpdateCreateStream, StateUpdateRemove, StateUpdateRestoreRead, StateUpdateRestoreWrite>;
/*
* The store data
*/
class Store
{
static CoRoutine invalid;
std::map<int, StoreData> data;
std::vector<StateUpdate> updates;
std::mutex updateMutex;
public:
StoreData& getStoreData(int fd);
void processUpdateRequest();
template<typename T>
void requestChange(T&& update);
private:
struct ApplyUpdate
{
Store& store;
ApplyUpdate(Store& store)
: store(store)
{}
void operator()(StateUpdateCreateServer& update){store(update);}
void operator()(StateUpdateCreateStream& update){store(update);}
void operator()(StateUpdateRemove& update) {store(update);}
void operator()(StateUpdateRestoreRead& update) {store(update);}
void operator()(StateUpdateRestoreWrite& update){store(update);}
};
void operator()(StateUpdateCreateServer& update);
void operator()(StateUpdateCreateStream& update);
void operator()(StateUpdateRemove& update);
void operator()(StateUpdateRestoreRead& update);
void operator()(StateUpdateRestoreWrite& update);
};
}
#endif
Store.cpp
#include "Store.h"
using namespace ThorsAnvil::Nisse;
/*
* There is no default constructor for CoRoutine.
* But a non-coroutine object is allowed to exist.
* So we have a single invalid co-routine that can be used
* to initialize non-coroutine objects until we are ready to initialize them.
*/
CoRoutine Store::invalid{[](Yield&){}};
StoreData& Store::getStoreData(int fd)
{
auto find = data.find(fd);
if (find == data.end())
{
throw std::runtime_error("Invalid Request: Exit applications");
}
return find->second;
}
template<typename T>
void Store::requestChange(T&& update)
{
std::unique_lock lock(updateMutex);
updates.emplace_back(std::move(update));
}
template void Store::requestChange<StateUpdateCreateServer>(StateUpdateCreateServer&& update);
template void Store::requestChange<StateUpdateCreateStream>(StateUpdateCreateStream&& update);
template void Store::requestChange<StateUpdateRemove>(StateUpdateRemove&& update);
template void Store::requestChange<StateUpdateRestoreRead>(StateUpdateRestoreRead&& update);
template void Store::requestChange<StateUpdateRestoreWrite>(StateUpdateRestoreWrite&& update);
void Store::processUpdateRequest()
{
std::unique_lock lock(updateMutex);
ApplyUpdate updater{*this};
for (auto& update: updates)
{
std::visit(updater, update);
/* This visit calls one of the operators below based on the type of the request */
}
updates.clear();
}
void Store::operator()(StateUpdateCreateServer& update)
{
auto [iter, ok] = data.insert_or_assign(update.fd,
ServerData{std::move(update.server),
std::move(invalid),
std::move(update.readEvent),
&update.pynt
});
ServerData& data = std::get<ServerData>(iter->second);
data.coRoutine = update.coRoutineCreator(data);
data.readEvent.add();
};
void Store::operator()(StateUpdateCreateStream& update)
{
auto [iter, ok] = data.insert_or_assign(update.fd,
StreamData{std::move(update.stream),
std::move(invalid),
std::move(update.readEvent),
std::move(update.writeEvent),
&update.pynt
});
StreamData& data = std::get<StreamData>(iter->second);
data.coRoutine = update.coRoutineCreator(data);
data.readEvent.add();
};
void Store::operator()(StateUpdateRemove& update)
{
data.erase(update.fd);
}
void Store::operator()(StateUpdateRestoreRead& update)
{
struct RestoreRead
{
void operator()(ServerData& update) {update.readEvent.add();}
void operator()(StreamData& update) {update.readEvent.add();}
};
auto find = data.find(update.fd);
if (find != data.end()) {
std::visit(RestoreRead{}, find->second);
}
}
void Store::operator()(StateUpdateRestoreWrite& update)
{
struct RestoreRead
{
void operator()(ServerData& update) {}
void operator()(StreamData& update) {update.writeEvent.add();}
};
auto find = data.find(update.fd);
if (find != data.end()) {
std::visit(RestoreRead{}, find->second);
}
}