There is also a limitation from the Epic EOS SDK in the absence of thread safety, all "C" calls can only be made from one thread. And from the thread where the SDK was initialized.
Based on the answer @G. Sliepen in my previous deferred-networking-eos. This is not an answer to its own question, because everything is new: the architecture, the code, and the meaning.
Trivial synchronization used. No catch in main for simplicity. Channel is 0 for simplicity.
The main.cpp code provided simply sends and receives a text strings twice.
Async/Environs.h
#pragma once
namespace Async {
namespace detail_ {
struct Environs {
const EOS_HPlatform m_platformHandle;
EOS_ProductUserId const m_localUserId, m_friendLocalUserId;
};
} // namespace detail_
typedef std::future< Networking::messageData_t > future_t;
} // namespace Async
namespace Async::Transport {
using Environs = detail_::Environs;
} // namespace Async::Transport
Async/Selector/IMultiplex.h
#pragma once
namespace Async::detail_::Selector {
typedef std::function<bool()> task_arg_t;
typedef std::packaged_task< Networking::messageData_t(task_arg_t const&) > task_t;
enum class Direction { Outgoing, Incoming };
struct IMux {
virtual void outgoing(task_t &&) = 0;
virtual void incoming(task_t &&) = 0;
virtual ~IMux() {}
};
typedef std::shared_ptr< IMux > multiplex_t;
struct IDemux {
virtual ~IDemux() {}
virtual bool pop(task_t*, Direction *) = 0;
};
typedef std::shared_ptr< IDemux > demultiplex_t;
} // namespace Async::detail_::Selector
namespace Async::Transport {
using multiplex_t = detail_::Selector::multiplex_t;
using task_t = detail_::Selector::task_t;
using task_arg_t = detail_::Selector::task_arg_t;
} // namespace Async::Transport
Async/Selector/Multiplexer.h
#pragma once
namespace Async::detail_::Selector {
class Multiplexer : public IMux, public IDemux {
typedef std::pair< task_t, Direction > queue_elem_t;
std::queue< queue_elem_t > m_fifo;
std::mutex m_mu;
bool pop(task_t *p, Direction *direction) override {
if ( m_fifo.empty( ) )
return false;
queue_elem_t x;
{ std::lock_guard lock( m_mu );
x = std::move( m_fifo.front( ) );
m_fifo.pop( );
}
*p = std::move( x.first );
*direction = x.second;
return true;
}
public:
void outgoing(task_t &&task) override {
std::lock_guard lock( m_mu );
m_fifo.push( { std::move( task ), Direction::Outgoing } );
}
void incoming(task_t &&task) override {
std::lock_guard lock( m_mu );
m_fifo.push( { std::move( task ), Direction::Incoming } );
}
virtual ~Multiplexer() {}
};
} // namespace Async::detail_::Selector
Async/GradualExecutor.h
#pragma once
namespace Async::detail_ {
class GradualExecutor {
Selector::demultiplex_t m_demultiplexer;
const EOS_HPlatform m_platformHandle;
static constexpr auto c_commandTO{ std::chrono::seconds{ 20 } };
static constexpr auto now = std::chrono::system_clock::now;
public:
GradualExecutor(Selector::demultiplex_t const& demultiplexer, Environs const& ctx) :
m_demultiplexer( demultiplexer )
, m_platformHandle( ctx.m_platformHandle )
{}
void all() {
using namespace std::literals::chrono_literals;
Selector::task_t task;
Selector::Direction direction;
while ( m_demultiplexer ->pop( &task, &direction ) ) {
if ( Selector::Direction::Outgoing == direction ) {
task( { } );
::EOS_Platform_Tick( m_platformHandle );
}
const auto timeout = now( ) + c_commandTO;
if ( Selector::Direction::Incoming == direction ) {
Selector::task_arg_t function( [this, &timeout] {
::EOS_Platform_Tick( m_platformHandle );
std::this_thread::sleep_for( 300ms );
return ( now( ) < timeout );
} );
task( function );
}
}
}
}; } // namespace Async::detail_
Async/Transport/Send.h
#pragma once
namespace Async::Transport {
class Send {
multiplex_t m_multiplexer; // shared_ptr to avoid dangling
const EOS_HP2P m_p2PHandle;
EOS_P2P_SendPacketOptions m_options{ EOS_P2P_SENDPACKET_API_LATEST };
EOS_P2P_SocketId m_sendSocketId{ EOS_P2P_SOCKETID_API_LATEST };
detail_::Acceptor m_acceptor;
typedef Networking::messageData_t messageData_t;
template<typename T>
future_t send_(T const& container) const {
task_t task =
std::packaged_task( [this, container](task_arg_t const&) ->messageData_t
{
const size_t maxDataLengthBytes = Networking::c_MaxDataSizeBytes;
auto it = container.begin( );
while ( it != container.end( ) ) {
const size_t distance = std::distance( it, container.end( ) );
const size_t dataLengthBytes = std::min( maxDataLengthBytes, distance );
EOS_P2P_SendPacketOptions options = m_options;
options.DataLengthBytes = static_cast< uint32_t >( dataLengthBytes );
options.Data = std::addressof( *it );
EOS_EResult result = ::EOS_P2P_SendPacket( m_p2PHandle, &options );
if ( EOS_EResult::EOS_Success != result )
throw std::runtime_error( "error EOS_P2P_SendPacket" );
it += dataLengthBytes;
}
return { };
} );
auto future = task.get_future( );
m_multiplexer ->outgoing( std::move( task ) );
return future;
}
public:
Send(Environs const& ctx, std::string const& socketName, multiplex_t const& mux) :
m_multiplexer( mux )
, m_p2PHandle( ::EOS_Platform_GetP2PInterface( ctx.m_platformHandle ) )
, m_acceptor( ctx, socketName )
{
strcpy_s( m_sendSocketId.SocketName , socketName.c_str( ) );
m_options.LocalUserId = ctx.m_localUserId;
m_options.RemoteUserId = ctx.m_friendLocalUserId;
m_options.SocketId = &m_sendSocketId;
m_options.bAllowDelayedDelivery = EOS_TRUE;
m_options.Channel = 0; // zero for simplicity
m_options.Reliability = EOS_EPacketReliability::EOS_PR_ReliableOrdered;
m_options.bDisableAutoAcceptConnection = EOS_FALSE;
}
auto text(std::string const& text) const {
return send_( text );
}
auto vector(messageData_t const& vector) const {
return send_( vector );
}
};
} // namespace Async::Transport
Async/Transport/Recv.h
#pragma once
namespace Async::Transport {
class Recv {
multiplex_t m_multiplexer; // shared_ptr to avoid dangling
const uint8_t m_channel;
const EOS_HP2P m_p2PHandle;
const uint8_t* m_requestedChannel = &m_channel;
EOS_P2P_ReceivePacketOptions m_options{ EOS_P2P_RECEIVEPACKET_API_LATEST };
detail_::Acceptor m_acceptor;
typedef Networking::messageData_t messageData_t;
public:
Recv(Environs const& ctx, std::string const& socketName, multiplex_t const& mux) :
m_multiplexer( mux )
, m_channel( 0 ) // zero for simplicity
, m_p2PHandle( ::EOS_Platform_GetP2PInterface( ctx.m_platformHandle ) )
, m_acceptor( ctx, socketName )
{
m_options.LocalUserId = ctx.m_localUserId;
m_options.MaxDataSizeBytes = Networking::c_MaxDataSizeBytes;
m_options.RequestedChannel = m_requestedChannel;
}
[[nodiscard]] auto byLength(size_t len) const {
task_t task =
std::packaged_task( [this, len](task_arg_t const& tick) ->messageData_t
{
EOS_ProductUserId unused_;
EOS_P2P_SocketId socketId;
uint8_t channel = 0;
messageData_t container( len );
uint32_t bytesWritten = 0;
auto it = container.begin( );
while ( it != container.end( ) ) {
EOS_EResult result = ::EOS_P2P_ReceivePacket( m_p2PHandle, &m_options
, &unused_, &socketId, &channel, std::addressof( *it ), &bytesWritten );
if ( EOS_EResult::EOS_Success == result ) {
it += bytesWritten;
continue;
}
if ( EOS_EResult::EOS_NotFound != result )
throw std::runtime_error( "error EOS_P2P_ReceivePacket" );
if ( !tick( ) )
return { };
}
return container;
} );
auto future = task.get_future( );
m_multiplexer ->incoming( std::move( task ) );
return future;
}
};
} // namespace Async::Transport
Async/TickerCore.h
#pragma once
namespace Async::detail_ {
struct TickerCore {
std::shared_ptr< Selector::Multiplexer > m_mux;
Selector::multiplex_t m_multiplexer;
Selector::demultiplex_t m_demultiplexer;
Environs m_ctx;
GradualExecutor m_executor;
typedef std::unique_ptr< TickerCore > uptr_t;
TickerCore(PrepareEos::prepared_t const& oes) :
m_mux( std::make_shared< Selector::Multiplexer >( ) )
, m_multiplexer( m_mux )
, m_demultiplexer( m_mux )
, m_ctx{ oes ->m_platformHandle, oes ->m_auth ->getLocalUserId( ), oes ->m_mapping ->getFriendLocalUserId( ) }
, m_executor( m_demultiplexer, m_ctx )
{}
};
} // namespace Async::detail_
Async/Thread/GameThread.h
#pragma once
namespace Async::Thread::detail_ {
class GameThread {
const bool m_isServer;
using TickerCore = Async::detail_::TickerCore;
TickerCore::uptr_t m_core;
protected:
std::atomic_bool m_bPrepared = false, m_bStop = false;
void run_() {
auto oes = Async::detail_::PrepareEos::ordinary( m_isServer );
if ( !oes )
return;
m_core = TickerCore::uptr_t( new TickerCore( oes ) );
// ugly, but need only this thread for init and working
m_bPrepared = true;
while ( !m_bStop ) {
std::this_thread::sleep_for( std::chrono::microseconds{ 300 } );
m_core ->m_executor.all( );
::EOS_Platform_Tick( m_core ->m_ctx.m_platformHandle );
}
}
explicit GameThread(bool isServer) :
m_isServer( isServer )
{}
public:
virtual ~GameThread() {}
Transport::Send createSender(std::string const& m_socketName) const {
return Transport::Send( m_core ->m_ctx, m_socketName, m_core ->m_multiplexer );
}
Transport::Recv createReceiver(std::string const& m_socketName) const {
return Transport::Recv( m_core ->m_ctx, m_socketName, m_core ->m_multiplexer );
}
};
} // namespace Async::Thread::detail_
Async/Thread/JThread.h
#pragma once
namespace Async::Thread { struct FactoryInfiniteWait; } // forward decl
namespace Async::Thread::detail_ {
class JThread final : public GameThread {
std::future<void> m_future;
public:
JThread(bool isServer) :
GameThread( isServer )
{
// convenient, but leakage may occur if the process is quickly exited
m_future = std::async( &JThread::run_, this );
}
~JThread() {
if ( m_future.valid( ) )
GameThread::m_bStop = true, m_future.wait( );
}
bool isValid() const {
return GameThread::m_bPrepared;
}
auto &getFuture() {
return m_future;
}
};
} // namespace Async::Thread::detail_
Async/Thread/Factory.h
#pragma once
namespace Async::Thread {
struct FactoryInfiniteWait {
typedef std::unique_ptr< detail_::GameThread > gameThread_t;
static gameThread_t gameThread(bool isServer) {
using namespace std::literals::chrono_literals;
const auto timeout = std::future_status::timeout;
// hide JThread from user
auto *p = new detail_::JThread( isServer );
auto &future = p ->getFuture( );
auto gameThread = gameThread_t( p );
std::future_status stat = std::future_status::ready;
while ( true
&& !p ->isValid( )
&& ( timeout == ( stat = future.wait_for( 300ms ) ) )
) (void)0;
if ( timeout == stat )
return gameThread;
// void or rethrow exception
future.get( );
return { };
}
};
} // namespace Async::Thread
demo main.cpp
#include "pch.h"
#include "Async/Environs.h"
#include "Async/Selector/IMultiplex.h"
#include "Async/Selector/Multiplexer.h"
#include "Async/GradualExecutor.h"
#include "Async/Transport/Send.h"
#include "Async/Transport/Recv.h"
#include "Async/TickerCore.h"
#include "Async/Thread/GameThread.h"
#include "Async/Thread/JThread.h"
#include "Async/Thread/Factory.h"
int main(int argc, char *argv[]) {
bool isServer = ( argc > 1 );
auto oes = Async::Thread::FactoryInfiniteWait::gameThread( isServer );
if ( !oes )
return ECONNRESET;
auto socketNameChat = "CHAT";
std::string text0 = "Hello Asy", text1 = "nc World!";
if ( isServer ) {
Async::Transport::Recv recvChat = oes ->createReceiver( socketNameChat );
Async::future_t command0 = recvChat.byLength( text0.length( ) );
Async::future_t command1 = recvChat.byLength( text1.length( ) );
Networking::messageData_t incoming0 = command0.get( );
Networking::messageData_t incoming1 = command1.get( );
assert( text0 == std::string( incoming0.begin( ), incoming0.end( ) ) );
assert( text1 == std::string( incoming1.begin( ), incoming1.end( ) ) );
} else {
Async::Transport::Send sendChat = oes ->createSender( socketNameChat );
sendChat.text( text0 ).wait( );
sendChat.text( text1 ).wait( );
}
return printf( "press [Enter] to exit\n" ), getchar( ), 0;
}
Omitted detail_::Acceptor
because it is not significantly involved in the work.
Async::detail_::PrepareEos
is skipped because there is a lot of monotonous and flat code.
Full code on GitHub repo
1 Answer 1
Naming things
There are several things that are named confusingly. Names should be precise and concise; they should convey quite clearly what they stand for in one or a few words. Prefer to use commonly used words. Also be consistent in how you name things. Don't abbreviate unnecessarily though. Prefer to use nouns for variables and types, and verbs for functions. Use the plural for containers that potentially hold multiple values.
Here is a list of things I think can be improved:
Environs
: this is an uncommon synonym of environment. It is usually used to describe the set of environment variables passed to programs. Here it used rather as a context, which is made clear by the variablem_ctx
. Also, it is specifically holding the context used by the EOS SDK, so I would call thisEOS_Context
.future_t
: this is specifically a future holdingNetworking::messageData_t
. Maybe rename it tomessageData_future_t
. Consider that you might add other things besides network functionality tonamespace Async
. What if you need futures holding other data types? This also brings me to:- Avoid using the
_t
suffix for type aliases. Just use the same naming scheme you use for other types: start with a capital. Consider that at some point you might change a type alias to become its ownclass
. Would you want to rename it everywhere then? - Consider not creating a type alias for everything. Sure, it might save some typing in some places, but it avoids hiding information about the type.
task_arg_t
: yes, it's used as the argument for the constructor oftask_t
, but on its own it is just the function you want the task to execute. So perhapstask_function_t
?multiplex_t
: prefer nouns for types, somultiplexer_t
. Also, it again hides that this is a shared pointer.queue_elem_t
: avoid abbreviations. I suggest usingqueue_element_t
orqueue_item_t
instead.m_fifo
: that describes what kind of container it is, but I think it's more important to know what it holds. I would usem_tasks
here (more about that later).m_mu
: don't abbreviate, writem_mutex
.x
: one-letter variables are sometimes OK to use, but only if their name and meaning match how they are commonly used, likei
for a loop index, orx
if it is an x-coordinate. Here it represents a single task from the queue, sotask
?p
: similar, but more about that later.Send
: use nouns for types:Sender
is much better. However, given that it wraps an EOS channel, I would name itTransmitChannel
orTxChannel
.text()
: use verbs for functions:send_text()
. The same goes forvector()
. Note that you can also just create two member functions namedsend()
with different argument types.outgoing()
,incoming()
: again, make them verbs:push_outgoing()
,push_incoming()
.GameThread
: despite its name, it doesn't do anything with threads. Sure, someone might callrun_()
inside a thread, but they don't have to. It's misnamed for sure, but that's not the only problem of this class, so I'll discuss this more below.JThread
: sounds likestd::jthread
, does something similar, but is also quite different. This is actually runningrun_()
in a thread, so maybe this should be calledGameThread
?FactoryInfiniteWait
: a factory that waits forever? What does it create and what does it wait for? It only contains astatic
member function, so this should not be astruct
at all. Rather, its constructor should be a free function namedcreateGameThread
or something similar.
Make the task type hold the direction
You made things more complicated than necessary by making task_t
only hold the packaged task, and let the direction be stored in queue_elem_t
. I think it would be much better to store the direction inside a class representing tasks:
struct Task {
std::packaged_task<Networking::messageData_t(task_arg_t const&)> task;
Direction direction;
};
This will simplify Multiplexer
a lot, especially if you also:
Use std::optional
Use std::optional
to return optional values. Consider this:
class Multiplexer: ... {
std::queue<Task> m_tasks;
std::mutex m_mutex;
std::optional<Task> pop() override {
if (m_fifo.empty())
return std::nullopt;
Task task;
{
std::lock_guard lock(m_mutex);
task = std::move(m_tasks.front());
m_tasks.pop();
}
return task;
}
void push(Task task) override {
std::lock_guard lock(m_mutex);
m_fifo.push(std::move(task));
}
};
The code using this can then pop like so:
while (auto task = m_demultiplexer->pop()) {
if (Selector::Direction::Outgoing == task.direction) {
task.task({});
...
And push like so:
auto task = [](task_arg_t const&)->mesageData_t{...};
...
m_multiplexer->push({task, Selector::Direction::Outgoing});
Don't overcomplicate things
FactoryInfiniteWait
is a terribly overcomplicated way to start a thread. I would get rid of it entirely, and then merge JThread
into GameThread
, so you have once object that when created runs a thread that handles game ticks, and when destructed stops that thread. Instead of having the thread create a TickerCore
object, let the caller do that, and then pass a reference to it to the constructor.
class GameThread {
std::atomic_bool m_stop = false;
void run_(TickerCore& core) {
while (!m_stop) {
std::this_thread::sleep_for(std::chrono::microseconds{300});
m_core->m_executor.all( );
::EOS_Platform_Tick(m_core->m_ctx.m_platformHandle);
}
};
std::thread m_thread;
public:
GameThread(TickerCore& core):
m_thread(&GameThread::run_, this, core) {}
~GameThread() {
m_stop = true;
m_thread.join();
}
}
Since C++20 you can use std::jthread
instead, so you just need a free run_()
function that takes a stop object as a parameter.
Then in main()
you write something like:
int main(int argc, char *argv[]) {
...
auto oes = Async::detail_::PrepareEos::ordinary(isServer);
auto tickerCore = TickerCore(oes);
auto gameThread = GameThread(tickerCore);
...
}
-
\$\begingroup\$ I always try to find a balance between "precise and concise", it doesn’t always work out, that’s true. About "nouns for variables and types, and verbs for functions" i got it. It will be difficult to use "plural for containers", I thought that this was an extra "s" because containers always have "multiple values", apparently just another attempt to shorten the naming. \$\endgroup\$Alexovsky– Alexovsky2024年01月26日 22:16:14 +00:00Commented Jan 26, 2024 at 22:16
-
\$\begingroup\$ By the way,
future_t
was planned as strictly one type fromstd::future
, so that there would be no desire to multiply entities. Among other things, "_t
suffix for type aliases", the hardest thing... If "start with a capital" then I will think that I have another entity that needs "care",_t
suffix for me gives confidence that almost nothing new is added to the code base, only something very well known appears. Yes, sometimes they become their ownclass
, which is good, it's almost a holiday. After all, the software is growing to an important improvement, which is good news. \$\endgroup\$Alexovsky– Alexovsky2024年01月26日 22:47:04 +00:00Commented Jan 26, 2024 at 22:47 -
\$\begingroup\$ And in a good mood I will rename them all. About "not creating a type alias for everything ... it avoids hiding information about the type" hmm, I’ll have to think about it, thank you. Wow,
task_arg_t
is really a function if you look at it from the other side. Aboutm_fifo
, it’s really better to indicate not the type of functionality, but a hint about the stored data, I often miss this. Thism_mu
was simply overlooked. Although no, it’s a bad habit. Yeah, aboutSend
we meanAsync::Transport::Send
, probably everyone will likeTxChannel
, and very briefly. \$\endgroup\$Alexovsky– Alexovsky2024年01月26日 22:52:41 +00:00Commented Jan 26, 2024 at 22:52 -
\$\begingroup\$ Yes, in
JThread
I tried to reproduce some of the functionality fromstd::jthread
, in terms of cleaning in the destructor, apparently it turned out ugly.FactoryInfiniteWait
is a fiasco. The only thing I can say in defense ofstruct
is thatstruct/class
is more convenient for unit tests. The newstruct Task
will really simplify everything, I’m ready to introduce it as soon as possible. I observe some kind of hostility towardsstd::optional
, strange, the thing itself is very good. Overall, these two sections provide a perfect simplification, thanks for the idea, it will be done. \$\endgroup\$Alexovsky– Alexovsky2024年01月26日 23:33:04 +00:00Commented Jan 26, 2024 at 23:33 -
\$\begingroup\$ Ugh,
FactoryInfiniteWait
+JThread
+GameThread
was a failed attempt to apply SOLID. Something went wrong. Maybe at least move creation and destruction into a separate class? Unlikely, because it is used once. Fine. One entity instead of three. Still, there is a determination to try to leave at leaststd::async
instead of the proposedstd::thread
, because of all its advantages. \$\endgroup\$Alexovsky– Alexovsky2024年01月27日 00:07:32 +00:00Commented Jan 27, 2024 at 0:07