0
\$\begingroup\$

I think I made a mistake trying to start from the simplest version and working forward. This time I will present the final version, get all the fixes from here and incorperat them into the code, then I can reverse engineer the steps to get there.

The Parts of the server are:

This post deals only with Store.

This class is used to keep track of all state associated with open sockets.

I wanted to divide this class into two parts. Only the main thread can update the state. So all updates are done via processUpdateRequest(). Any thread can add call requestChange(T&& update) which queues a request to be handled by processUpdateRequest().

Store.h

#ifndef THORSANVIL_NISSE_STORE_H
#define THORSANVIL_NISSE_STORE_H
#include "NisseConfig.h"
#include "NisseUtil.h"
#include "Pynt.h"
#include "EventHandlerLibEvent.h"
#include <ThorsSocket/Server.h>
#include <ThorsSocket/SocketStream.h>
#include <variant>
#include <functional>
#include <map>
#include <vector>
#include <mutex>
/*
 * The store can hold two types of object:
 * ServerData: Data needed by the listeners.
 * The ThorsSocket::Server object.
 * StreamData: Data needed by a connected socket that is communicating.
 * The ThorsSocket::SocketStream object.
 *
 * Addition both types hold the following information.
 * 2: Task A lambda that is executed.
 * 3: CoRoutine A boost CoRoutine2 that allows code to yield if the socket blocks.
 * 4: LibEvent objects These interact with the EventHandler to make sure that
 * callbacks activate the Co-Routine.
 *
 * When an object is first created we also active the LibEvent read Listener of that object.
 *
 * To make sure that this object is thread safe state change is only done by the master thread
 * via the method `processUpdateRequest()`. All other threads request state change
 * via the `requestChange()` method that enqueues a request to be done by the main thread.
 */
namespace ThorsAnvil::Nisse
{
/*
 * Data that can be stored.
 */
struct ServerData
{
 TAS::Server server;
 CoRoutine coRoutine;
 Event readEvent;
 Pynt* pynt;
};
struct StreamData
{
 TAS::SocketStream stream;
 CoRoutine coRoutine;
 Event readEvent;
 Event writeEvent;
 Pynt* pynt;
};
using StoreData = std::variant<ServerData, StreamData>;
/*
 * Change request objects.
 * The following are objects that can be enqueued by requestChange()
 */
struct StateUpdateCreateServer
{
 int fd;
 TAS::Server server;
 ServerCreator coRoutineCreator;
 Event readEvent;
 Pynt& pynt;
};
struct StateUpdateCreateStream
{
 int fd;
 TAS::SocketStream stream;
 StreamCreator coRoutineCreator;
 Event readEvent;
 Event writeEvent;
 Pynt& pynt;
};
struct StateUpdateRemove
{
 int fd;
};
struct StateUpdateRestoreRead
{
 int fd;
};
struct StateUpdateRestoreWrite
{
 int fd;
};
using StateUpdate = std::variant<StateUpdateCreateServer, StateUpdateCreateStream, StateUpdateRemove, StateUpdateRestoreRead, StateUpdateRestoreWrite>;
/*
 * The store data
 */
class Store
{
 static CoRoutine invalid;
 std::map<int, StoreData> data;
 std::vector<StateUpdate> updates;
 std::mutex updateMutex;
 public:
 StoreData& getStoreData(int fd);
 void processUpdateRequest();
 template<typename T>
 void requestChange(T&& update);
 private:
 struct ApplyUpdate
 {
 Store& store;
 ApplyUpdate(Store& store)
 : store(store)
 {}
 void operator()(StateUpdateCreateServer& update){store(update);}
 void operator()(StateUpdateCreateStream& update){store(update);}
 void operator()(StateUpdateRemove& update) {store(update);}
 void operator()(StateUpdateRestoreRead& update) {store(update);}
 void operator()(StateUpdateRestoreWrite& update){store(update);}
 };
 void operator()(StateUpdateCreateServer& update);
 void operator()(StateUpdateCreateStream& update);
 void operator()(StateUpdateRemove& update);
 void operator()(StateUpdateRestoreRead& update);
 void operator()(StateUpdateRestoreWrite& update);
};
}
#endif

Store.cpp

#include "Store.h"
using namespace ThorsAnvil::Nisse;
/*
 * There is no default constructor for CoRoutine.
 * But a non-coroutine object is allowed to exist.
 * So we have a single invalid co-routine that can be used
 * to initialize non-coroutine objects until we are ready to initialize them.
 */
CoRoutine Store::invalid{[](Yield&){}};
StoreData& Store::getStoreData(int fd)
{
 auto find = data.find(fd);
 if (find == data.end())
 {
 throw std::runtime_error("Invalid Request: Exit applications");
 }
 return find->second;
}
template<typename T>
void Store::requestChange(T&& update)
{
 std::unique_lock lock(updateMutex);
 updates.emplace_back(std::move(update));
}
template void Store::requestChange<StateUpdateCreateServer>(StateUpdateCreateServer&& update);
template void Store::requestChange<StateUpdateCreateStream>(StateUpdateCreateStream&& update);
template void Store::requestChange<StateUpdateRemove>(StateUpdateRemove&& update);
template void Store::requestChange<StateUpdateRestoreRead>(StateUpdateRestoreRead&& update);
template void Store::requestChange<StateUpdateRestoreWrite>(StateUpdateRestoreWrite&& update);
void Store::processUpdateRequest()
{
 std::unique_lock lock(updateMutex);
 ApplyUpdate updater{*this};
 for (auto& update: updates)
 {
 std::visit(updater, update);
 /* This visit calls one of the operators below based on the type of the request */
 }
 updates.clear();
}
void Store::operator()(StateUpdateCreateServer& update)
{
 auto [iter, ok] = data.insert_or_assign(update.fd,
 ServerData{std::move(update.server),
 std::move(invalid),
 std::move(update.readEvent),
 &update.pynt
 });
 ServerData& data = std::get<ServerData>(iter->second);
 data.coRoutine = update.coRoutineCreator(data);
 data.readEvent.add();
};
void Store::operator()(StateUpdateCreateStream& update)
{
 auto [iter, ok] = data.insert_or_assign(update.fd,
 StreamData{std::move(update.stream),
 std::move(invalid),
 std::move(update.readEvent),
 std::move(update.writeEvent),
 &update.pynt
 });
 StreamData& data = std::get<StreamData>(iter->second);
 data.coRoutine = update.coRoutineCreator(data);
 data.readEvent.add();
};
void Store::operator()(StateUpdateRemove& update)
{
 data.erase(update.fd);
}
void Store::operator()(StateUpdateRestoreRead& update)
{
 struct RestoreRead
 {
 void operator()(ServerData& update) {update.readEvent.add();}
 void operator()(StreamData& update) {update.readEvent.add();}
 };
 auto find = data.find(update.fd);
 if (find != data.end()) {
 std::visit(RestoreRead{}, find->second);
 }
}
void Store::operator()(StateUpdateRestoreWrite& update)
{
 struct RestoreRead
 {
 void operator()(ServerData& update) {}
 void operator()(StreamData& update) {update.writeEvent.add();}
 };
 auto find = data.find(update.fd);
 if (find != data.end()) {
 std::visit(RestoreRead{}, find->second);
 }
}
asked Oct 15, 2024 at 5:40
\$\endgroup\$

0

Know someone who can answer? Share a link to this question via email, Twitter, or Facebook.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.