I think I made a mistake trying to start from the simplest version and working forward. This time I will present the final version, get all the fixes from here and incorperat them into the code, then I can reverse engineer the steps to get there.
The Parts of the server are:
This post deals only with NisseServer.
Basically this holds the Store
, EventHandler
and JobQueue
.
But also creates the actual Work
objects that are passed to the JobQueue
and registered with the EventHandler
.
NisseServer.h
#ifndef THORSANVIL_NISSE_SERVER_H
#define THORSANVIL_NISSE_SERVER_H
/*
* NisseServer:
* Holds
* JobQueue: This is a set of background thread to do any work set by the user.
* Store: All stage information needed by the server.
* Storage is thread safe assuming:
* Only main thread adds new data.
* Each thread only reads the object that it is acting on.
* EventHandler: LibEvent wrapper.
* It hold's all the information needed to processes a connection.
* The server puts appropriate "lambdas" into the Event Handler to processes a socket.
*/
#include "NisseConfig.h"
#include "JobQueue.h"
#include "Store.h"
#include "EventHandler.h"
#include "Pynt.h"
#include <ThorsSocket/SocketStream.h>
namespace ThorsAnvil::Nisse
{
class NisseServer
{
JobQueue jobQueue;
Store store;
EventHandler eventHandler;
public:
NisseServer(int workerCount = 1);
void run();
template<typename T>
void listen(T listenerInit, Pynt& pynt);
private:
CoRoutine createStreamJob(StreamData& info);
CoRoutine createAcceptJob(ServerData& info);
};
}
#endif
NisseServer.cpp
#include "NisseServer.h"
#include "EventHandler.h"
using namespace ThorsAnvil::Nisse;
NisseServer::NisseServer(int workerCount)
: jobQueue{workerCount}
, store{}
, eventHandler{jobQueue, store}
{}
void NisseServer::run()
{
eventHandler.run();
}
CoRoutine NisseServer::createStreamJob(StreamData& info)
{
return CoRoutine
{
[&info](Yield& yield)
{
// Set the socket to work asynchronously.
info.stream.getSocket().setReadYield([&yield](){yield(TaskYieldState::RestoreRead);return true;});
info.stream.getSocket().setWriteYield([&yield](){yield(TaskYieldState::RestoreWrite);return true;});
// Return control to the creator.
// The next call will happen when there is data available on the file descriptor.
yield(TaskYieldState::RestoreRead);
try
{
PyntResult result = info.pynt->handleRequest(info.stream);
while (result == PyntResult::More)
{
yield(TaskYieldState::RestoreRead);
result = info.pynt->handleRequest(info.stream);
}
}
catch (...)
{
std::cerr << "Pynt Exception:\n";
}
// We are all done
// So indicate that we should tidy up state.
yield(TaskYieldState::Remove);
}
};
}
CoRoutine NisseServer::createAcceptJob(ServerData& info)
{
return CoRoutine
{
[&](Yield& yield)
{
// Set the socket to work asynchronously.
info.server.setYield([&yield](){yield(TaskYieldState::RestoreRead);return true;});
// Return control to the creator.
// The next call will happen when there is data available on the file descriptor.
yield(TaskYieldState::RestoreRead);
try
{
while (true)
{
TAS::Socket accept = info.server.accept(TAS::Blocking::No);
if (accept.isConnected())
{
// If everything worked then create a stream connection (see above)
// Passing the "Pynt" as the object that will handle the request.
// Note: The "Pynt" functionality is not run yet. The socket must be available to use.
eventHandler.add(TAS::SocketStream{std::move(accept)}, [&](StreamData& info){return createStreamJob(info);}, *info.pynt);
}
yield(TaskYieldState::RestoreRead);
}
}
catch (...)
{
std::cerr << "Pynt Exception:\n";
}
// We are all done
// So indicate that we should tidy up state.
yield(TaskYieldState::Remove);
}
};
}
template<typename T>
void NisseServer::listen(T listenerInit, Pynt& pynt)
{
TAS::Server server{listenerInit};
eventHandler.add(std::move(server), [&](ServerData& info){return createAcceptJob(info);}, pynt);
}
template void NisseServer::listen<TAS::SServerInfo>(TAS::SServerInfo listenerInit, Pynt& pynt);
template void NisseServer::listen<TAS::ServerInfo>(TAS::ServerInfo listenerInit, Pynt& pynt);