My project has a single message stream (communicating with an external device) which is accessed via multiple client threads. The workflow is:
- There is a thread (the "messaging thread") which reads the message stream and is able to dispatch received messages to clients. This works by waiting on a thread-safe queue which the message stream exposes.
- Messages may be sent on the stream from any thread (the stream is thread-safe and handles concurrent send/receive)
- A client thread sends a request across the message stream, and provides a callback function.
- The client thread waits for responses. There may be multiple responses. The messaging thread knows how to match a received message to a client thread, based on the request sent by the client thread. The messaging thread invokes the right callback function for each received message.
- After sending the request, the client thread must block until all responses have been processed. This is normally indicated by the callback function returning a particular status, but it must also be possible to unblock a client thread and end the interaction by the messaging server receiving a particular message which it will recognize, or the application shutting down.
- From time to time the messaging service may be unavailable (the messaging thread will know this); so the client threads must be able to wait for a certain time to see if the messaging service has become available.
Those are my requirements but I can't get my head around how to code the logic for the client threads "entering" and waiting; and how the client threads should communicate with the messaging thread. Which synchronization primitives to use, and so on.
I'm coding in C++ with the Poco libraries, and can use all the usual primitives (mutex, event, semaphore, condition variable etc.) as well as higher level constructs like a notification queue, notification centre, and event dispatcher.
1 Answer 1
There are different ways to do it, but if you are inclined to stick with POCO, you may want to look at the macchina.io (OSP portion) WebEvent implementation - it is essentially a pub/sub messaging framework. There's more there than what you need but it's relatively simple and architecturally you should be able to quickly tailor it to your needs. I have used it in production for many years and it works well; it will also be ported in an OSP-independent form to Poco for one of the next releases.
Client can be either (1) a web socket endpoint or (2) an in-process observer which can send (i.e. post events) data and/or subscribe (i.e. receive notifications) to one or more subjects (topics). You'll probably need many in-process observers and one remote endpoint.
The framework runs in two threads handling:
Main queue - responsible for dispatching subscribe/unsubscribe request events from clients.
Worker queue - responsible for dispatching the data events (messages).
Each queue is dealt with in its own thread and there is a dotted-notation naming scheme for subject names, see here for details. Note that documentation only mentions WebSockets but naming works exactly the same for in-process observers and you may want or need a different naming scheme.