I think I made a mistake trying to start from the simplest version and working forward. This time I will present the final version, get all the fixes from here and incorperat them into the code, then I can reverse engineer the steps to get there.
The Parts of the server are:
This post deals only with EventHandler.
The EventHandlerLibEvent.h
provides RIAA wrapper for LibEvent types that I need. The EventHandler
uses LibEvent to set callbacks and handlers. There is a timer that updates the Store
state and any events on a file descriptor cause jobs to be added to the JobQueue
.
EventHandlerLibEvent.h
#ifndef THORSANVIL_NISSE_EVENT_HANDLER_LIBEVENT_H
#define THORSANVIL_NISSE_EVENT_HANDLER_LIBEVENT_H
/*
* Wrapper class to give LibEvent types proper RIAA characteristics.
*
* EventBase is not copyable or movable. libEvent event_base
* Event is moveable. libEvent event
*/
#include "NisseConfig.h"
#include <event2/event.h>
#include <utility>
namespace ThorsAnvil::Nisse
{
using LibEventEventBase = ::event_base;
using LibEventEvent = ::event;
using LibEventTimeOut = ::timeval;
class EventHandler;
enum class EventType : short{Read = EV_READ, Write = EV_WRITE};
class Event;
class EventBase
{
friend class Event;
LibEventEventBase* eventBase;
public:
EventBase()
: eventBase(event_base_new())
{}
~EventBase()
{
event_base_free(eventBase);
}
EventBase(EventBase const&) = delete;
EventBase(EventBase&&) = delete;
EventBase& operator=(EventBase const&) = delete;
EventBase& operator=(EventBase&&) = delete;
void run()
{
event_base_loop(eventBase, EVLOOP_NO_EXIT_ON_EMPTY);
}
};
class Event
{
LibEventEvent* event;
public:
Event(EventBase& eventBase, EventHandler& eventHandler);
Event(EventBase& eventBase, int fd, EventType type, EventHandler& eventHandler);
Event()
: event(nullptr)
{}
Event(Event&& move)
: event(std::exchange(move.event, nullptr))
{}
Event& operator=(Event&& move)
{
event = std::exchange(move.event, nullptr);
return *this;
}
~Event()
{
if (event) {
event_free(event);
}
}
Event(Event const&) = delete;
Event& operator=(Event const&) = delete;
void add()
{
event_add(event, nullptr);
}
void add(int microsecondsPause)
{
LibEventTimeOut timeout = {0, microsecondsPause};
evtimer_add(event, &timeout);
}
};
}
#endif
EventHandler.h
#ifndef THORSANVIL_NISSE_EVENT_HANDLER_H
#define THORSANVIL_NISSE_EVENT_HANDLER_H
/*
* A thin wrapper on libEvent to C++ it.
*
* When an socket listener is first created via add() we store all data in the Store object.
* When this has been created it adds the `ReadEvent` to libEvent to listen for any data.
*
* When (if) a socket event is triggered we save a lambda on the JobQueue addJob() that will be
* executed by a thread. The lambda restarts the CoRoutine which will either yield one of
* three values.
*
* When the code yields one of three situations happens:
* * TaskYieldState::RestoreRead We restore the read listener waiting for more data.
* * TaskYieldState::RestoreWrite We restore the write listener waiting for space to write.
* * TaskYieldState::Remove We destroy the socket and all associated data.
*
* Note: This data is never destroyed immediately because the code may be executing on any thread.
* Instead a request is queued on the `Store` object. The main thread will then be used
* to clean up data (See Store for details).
*/
#include "NisseConfig.h"
#include "NisseUtil.h"
#include "EventHandlerLibEvent.h"
#include "Store.h"
/*
* C-Callback registered with LibEvent
*/
extern "C" void eventCallback(evutil_socket_t fd, short eventType, void* data);
extern "C" void controlTimerCallback(evutil_socket_t fd, short eventType, void* data);
namespace ThorsAnvil::Nisse
{
class JobQueue;
class Store;
struct StreamData;
struct ServerData;
class EventHandler
{
static constexpr int controlTimerPause = 1000; // microseconds between iterations.
JobQueue& jobQueue;
Store& store;
EventBase eventBase;
Event timer;
public:
EventHandler(JobQueue& jobQueue, Store& store);
void run();
void add(TAS::Server&& stream, ServerCreator&& creator, Pynt& pynt);
void add(TAS::SocketStream&& stream, StreamCreator&& creator, Pynt& pynt);
private:
friend void ::eventCallback(evutil_socket_t fd, short eventType, void* data);
void eventHandle(int fd, EventType type);
private:
friend void ::controlTimerCallback(evutil_socket_t fd, short eventType, void* data);
void controlTimerAction();
private:
struct ApplyEvent
{
EventHandler& handler;
int fd;
EventType type;
ApplyEvent(EventHandler& handler, int fd, EventType type)
: handler(handler)
, fd(fd)
, type(type)
{}
void operator()(ServerData& info) {handler.addJob(info.coRoutine, fd);}
void operator()(StreamData& info) {if (handler.checkFileDescriptorOK(fd, type)) {handler.addJob(info.coRoutine, fd);}}
};
bool checkFileDescriptorOK(int fd, EventType type);
void addJob(CoRoutine& work, int fd);
};
}
#endif
EventHandler.cpp
#include "EventHandler.h"
#include "NisseUtil.h"
#include "JobQueue.h"
#include "Store.h"
/*
* C Callback functions.
* Simply decide the data into EventHandler and call the C++ functions.
*/
void eventCallback(evutil_socket_t fd, short eventType, void* data)
{
ThorsAnvil::Nisse::EventHandler& eventHandler = *reinterpret_cast<ThorsAnvil::Nisse::EventHandler*>(data);
eventHandler.eventHandle(fd, static_cast<ThorsAnvil::Nisse::EventType>(eventType));
}
void controlTimerCallback(evutil_socket_t, short, void* data)
{
ThorsAnvil::Nisse::EventHandler& eventHandler = *reinterpret_cast<ThorsAnvil::Nisse::EventHandler*>(data);
eventHandler.controlTimerAction();
}
using namespace ThorsAnvil::Nisse;
/*
* EventLib wrapper. Set up C-Function callbacks
*/
Event::Event(EventBase& eventBase, int fd, EventType type, EventHandler& eventHandler)
: event{event_new(eventBase.eventBase, fd, static_cast<short>(type), &eventCallback, &eventHandler)}
{}
Event::Event(EventBase& eventBase, EventHandler& eventHandler)
: event{evtimer_new(eventBase.eventBase, controlTimerCallback, &eventHandler)}
{}
EventHandler::EventHandler(JobQueue& jobQueue, Store& store)
: jobQueue(jobQueue)
, store(store)
, timer(eventBase, *this)
{
timer.add(controlTimerPause);
}
void EventHandler::run()
{
eventBase.run();
}
void EventHandler::add(TAS::Server&& server, ServerCreator&& serverCreator, Pynt& pynt)
{
int fd = server.socketId();
store.requestChange(StateUpdateCreateServer{fd,
std::move(server),
std::move(serverCreator),
Event{eventBase, fd, EventType::Read, *this},
pynt
});
}
void EventHandler::add(TAS::SocketStream&& stream, StreamCreator&& streamCreator, Pynt& pynt)
{
int fd = stream.getSocket().socketId();
store.requestChange(StateUpdateCreateStream{fd,
std::move(stream),
std::move(streamCreator),
Event{eventBase, fd, EventType::Read, *this},
Event{eventBase, fd, EventType::Write, *this},
pynt
});
}
void EventHandler::eventHandle(int fd, EventType type)
{
StoreData& info = store.getStoreData(fd);
std::visit(ApplyEvent{*this, fd, type}, info);
/* The std::visit ApplyEvent object to call checkFileDescriptorOK()/addJob() below */
}
bool EventHandler::checkFileDescriptorOK(int fd, EventType type)
{
/*
* This function detects if the socket has been closed at the other end.
* If the other end of the socket was closed this will induce a read event.
* But no data will be available.
*/
if (type == EventType::Write) {
return true;
}
char buffer;
ssize_t result = recv(fd, &buffer, 1, MSG_PEEK);
if (result == 0 || (result == -1 && errno != EAGAIN && errno != EWOULDBLOCK))
{
// Remove a socket if the other end has been closed.
std::cout << "Remove Socket\n";
store.requestChange(StateUpdateRemove{fd});
return false;
}
return true;
}
void EventHandler::addJob(CoRoutine& work, int fd)
{
/*
* Add a job to the task queue.
* This calls the passed CoRoutine and then updates the state of the object.
* Note: The update is done via requestChange() as it can be called by any thread.
*/
jobQueue.addJob([&work, fd, &store = this->store]()
{
TaskYieldState task = TaskYieldState::Remove;
if (work()) {
task = work.get();
}
switch (task)
{
case TaskYieldState::Remove:
store.requestChange(StateUpdateRemove{fd});
break;
case TaskYieldState::RestoreRead:
store.requestChange(StateUpdateRestoreRead{fd});
break;
case TaskYieldState::RestoreWrite:
store.requestChange(StateUpdateRestoreWrite{fd});
break;
}
});
}
/*
* Callback function controlled by timer.
* This is used to processes all pending changes.
*
* Thus all changes to state are done by the main thread.
*/
void EventHandler::controlTimerAction()
{
// Update all the state information.
store.processUpdateRequest();
// Put the timer back.
timer.add(controlTimerPause);
}