0
\$\begingroup\$

I think I made a mistake trying to start from the simplest version and working forward. This time I will present the final version, get all the fixes from here and incorperat them into the code, then I can reverse engineer the steps to get there.

The Parts of the server are:

This post deals only with EventHandler.

The EventHandlerLibEvent.h provides RIAA wrapper for LibEvent types that I need. The EventHandler uses LibEvent to set callbacks and handlers. There is a timer that updates the Store state and any events on a file descriptor cause jobs to be added to the JobQueue.

EventHandlerLibEvent.h

#ifndef THORSANVIL_NISSE_EVENT_HANDLER_LIBEVENT_H
#define THORSANVIL_NISSE_EVENT_HANDLER_LIBEVENT_H
/*
 * Wrapper class to give LibEvent types proper RIAA characteristics.
 *
 * EventBase is not copyable or movable. libEvent event_base
 * Event is moveable. libEvent event
 */
#include "NisseConfig.h"
#include <event2/event.h>
#include <utility>
namespace ThorsAnvil::Nisse
{
using LibEventEventBase = ::event_base;
using LibEventEvent = ::event;
using LibEventTimeOut = ::timeval;
class EventHandler;
enum class EventType : short{Read = EV_READ, Write = EV_WRITE};
class Event;
class EventBase
{
 friend class Event;
 LibEventEventBase* eventBase;
 public:
 EventBase()
 : eventBase(event_base_new())
 {}
 ~EventBase()
 {
 event_base_free(eventBase);
 }
 EventBase(EventBase const&) = delete;
 EventBase(EventBase&&) = delete;
 EventBase& operator=(EventBase const&) = delete;
 EventBase& operator=(EventBase&&) = delete;
 void run()
 {
 event_base_loop(eventBase, EVLOOP_NO_EXIT_ON_EMPTY);
 }
};
class Event
{
 LibEventEvent* event;
 public:
 Event(EventBase& eventBase, EventHandler& eventHandler);
 Event(EventBase& eventBase, int fd, EventType type, EventHandler& eventHandler);
 Event()
 : event(nullptr)
 {}
 Event(Event&& move)
 : event(std::exchange(move.event, nullptr))
 {}
 Event& operator=(Event&& move)
 {
 event = std::exchange(move.event, nullptr);
 return *this;
 }
 ~Event()
 {
 if (event) {
 event_free(event);
 }
 }
 Event(Event const&) = delete;
 Event& operator=(Event const&) = delete;
 void add()
 {
 event_add(event, nullptr);
 }
 void add(int microsecondsPause)
 {
 LibEventTimeOut timeout = {0, microsecondsPause};
 evtimer_add(event, &timeout);
 }
};
}
#endif

EventHandler.h

#ifndef THORSANVIL_NISSE_EVENT_HANDLER_H
#define THORSANVIL_NISSE_EVENT_HANDLER_H
/*
 * A thin wrapper on libEvent to C++ it.
 *
 * When an socket listener is first created via add() we store all data in the Store object.
 * When this has been created it adds the `ReadEvent` to libEvent to listen for any data.
 *
 * When (if) a socket event is triggered we save a lambda on the JobQueue addJob() that will be
 * executed by a thread. The lambda restarts the CoRoutine which will either yield one of
 * three values.
 *
 * When the code yields one of three situations happens:
 * * TaskYieldState::RestoreRead We restore the read listener waiting for more data.
 * * TaskYieldState::RestoreWrite We restore the write listener waiting for space to write.
 * * TaskYieldState::Remove We destroy the socket and all associated data.
 *
 * Note: This data is never destroyed immediately because the code may be executing on any thread.
 * Instead a request is queued on the `Store` object. The main thread will then be used
 * to clean up data (See Store for details).
 */
#include "NisseConfig.h"
#include "NisseUtil.h"
#include "EventHandlerLibEvent.h"
#include "Store.h"
/*
 * C-Callback registered with LibEvent
 */
extern "C" void eventCallback(evutil_socket_t fd, short eventType, void* data);
extern "C" void controlTimerCallback(evutil_socket_t fd, short eventType, void* data);
namespace ThorsAnvil::Nisse
{
class JobQueue;
class Store;
struct StreamData;
struct ServerData;
class EventHandler
{
 static constexpr int controlTimerPause = 1000; // microseconds between iterations.
 JobQueue& jobQueue;
 Store& store;
 EventBase eventBase;
 Event timer;
 public:
 EventHandler(JobQueue& jobQueue, Store& store);
 void run();
 void add(TAS::Server&& stream, ServerCreator&& creator, Pynt& pynt);
 void add(TAS::SocketStream&& stream, StreamCreator&& creator, Pynt& pynt);
 private:
 friend void ::eventCallback(evutil_socket_t fd, short eventType, void* data);
 void eventHandle(int fd, EventType type);
 private:
 friend void ::controlTimerCallback(evutil_socket_t fd, short eventType, void* data);
 void controlTimerAction();
 private:
 struct ApplyEvent
 {
 EventHandler& handler;
 int fd;
 EventType type;
 ApplyEvent(EventHandler& handler, int fd, EventType type)
 : handler(handler)
 , fd(fd)
 , type(type)
 {}
 void operator()(ServerData& info) {handler.addJob(info.coRoutine, fd);}
 void operator()(StreamData& info) {if (handler.checkFileDescriptorOK(fd, type)) {handler.addJob(info.coRoutine, fd);}}
 };
 bool checkFileDescriptorOK(int fd, EventType type);
 void addJob(CoRoutine& work, int fd);
};
}
#endif

EventHandler.cpp

#include "EventHandler.h"
#include "NisseUtil.h"
#include "JobQueue.h"
#include "Store.h"
/*
 * C Callback functions.
 * Simply decide the data into EventHandler and call the C++ functions.
 */
void eventCallback(evutil_socket_t fd, short eventType, void* data)
{
 ThorsAnvil::Nisse::EventHandler& eventHandler = *reinterpret_cast<ThorsAnvil::Nisse::EventHandler*>(data);
 eventHandler.eventHandle(fd, static_cast<ThorsAnvil::Nisse::EventType>(eventType));
}
void controlTimerCallback(evutil_socket_t, short, void* data)
{
 ThorsAnvil::Nisse::EventHandler& eventHandler = *reinterpret_cast<ThorsAnvil::Nisse::EventHandler*>(data);
 eventHandler.controlTimerAction();
}
using namespace ThorsAnvil::Nisse;
/*
 * EventLib wrapper. Set up C-Function callbacks
 */
Event::Event(EventBase& eventBase, int fd, EventType type, EventHandler& eventHandler)
 : event{event_new(eventBase.eventBase, fd, static_cast<short>(type), &eventCallback, &eventHandler)}
{}
Event::Event(EventBase& eventBase, EventHandler& eventHandler)
 : event{evtimer_new(eventBase.eventBase, controlTimerCallback, &eventHandler)}
{}
EventHandler::EventHandler(JobQueue& jobQueue, Store& store)
 : jobQueue(jobQueue)
 , store(store)
 , timer(eventBase, *this)
{
 timer.add(controlTimerPause);
}
void EventHandler::run()
{
 eventBase.run();
}
void EventHandler::add(TAS::Server&& server, ServerCreator&& serverCreator, Pynt& pynt)
{
 int fd = server.socketId();
 store.requestChange(StateUpdateCreateServer{fd,
 std::move(server),
 std::move(serverCreator),
 Event{eventBase, fd, EventType::Read, *this},
 pynt
 });
}
void EventHandler::add(TAS::SocketStream&& stream, StreamCreator&& streamCreator, Pynt& pynt)
{
 int fd = stream.getSocket().socketId();
 store.requestChange(StateUpdateCreateStream{fd,
 std::move(stream),
 std::move(streamCreator),
 Event{eventBase, fd, EventType::Read, *this},
 Event{eventBase, fd, EventType::Write, *this},
 pynt
 });
}
void EventHandler::eventHandle(int fd, EventType type)
{
 StoreData& info = store.getStoreData(fd);
 std::visit(ApplyEvent{*this, fd, type}, info);
 /* The std::visit ApplyEvent object to call checkFileDescriptorOK()/addJob() below */
}
bool EventHandler::checkFileDescriptorOK(int fd, EventType type)
{
 /*
 * This function detects if the socket has been closed at the other end.
 * If the other end of the socket was closed this will induce a read event.
 * But no data will be available.
 */
 if (type == EventType::Write) {
 return true;
 }
 char buffer;
 ssize_t result = recv(fd, &buffer, 1, MSG_PEEK);
 if (result == 0 || (result == -1 && errno != EAGAIN && errno != EWOULDBLOCK))
 {
 // Remove a socket if the other end has been closed.
 std::cout << "Remove Socket\n";
 store.requestChange(StateUpdateRemove{fd});
 return false;
 }
 return true;
}
void EventHandler::addJob(CoRoutine& work, int fd)
{
 /*
 * Add a job to the task queue.
 * This calls the passed CoRoutine and then updates the state of the object.
 * Note: The update is done via requestChange() as it can be called by any thread.
 */
 jobQueue.addJob([&work, fd, &store = this->store]()
 {
 TaskYieldState task = TaskYieldState::Remove;
 if (work()) {
 task = work.get();
 }
 switch (task)
 {
 case TaskYieldState::Remove:
 store.requestChange(StateUpdateRemove{fd});
 break;
 case TaskYieldState::RestoreRead:
 store.requestChange(StateUpdateRestoreRead{fd});
 break;
 case TaskYieldState::RestoreWrite:
 store.requestChange(StateUpdateRestoreWrite{fd});
 break;
 }
 });
}
/*
 * Callback function controlled by timer.
 * This is used to processes all pending changes.
 *
 * Thus all changes to state are done by the main thread.
 */
void EventHandler::controlTimerAction()
{
 // Update all the state information.
 store.processUpdateRequest();
 // Put the timer back.
 timer.add(controlTimerPause);
}
asked Oct 15, 2024 at 5:46
\$\endgroup\$

0

Know someone who can answer? Share a link to this question via email, Twitter, or Facebook.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.