I would like someone to look at my class for reading messages from a network socket. In short, the whole app should behave like a regular ping command with some additional functionalities. The point here is that I want to separate the reading of messages, and processing (which is done each second). This object is run in a separate thread, and the main thread is responsibe for processing the messages. After each second the main thread is notified to process collected messages. My motivation for this approach (separating the work for reading and processing) is to remove the overload of message processing from the readout, since each message gets a time stamp immediately after it is read, which is used later for calculating RTT. Is this a good approach? If so, could the reading of messages be somehow improved? Or this is good enough?
public class MessageReader implements Runnable {
private MessageInputOutput messageIO;
private BlockingQueue<BufferQueueElement> queue;
/**
* Create an object for reading messages.
* @param messageIO Stream IO reference.
* @param queue Blocking queue.
*/
public MessageReader(MessageInputOutput messageIO, BlockingQueue<BufferQueueElement> queue) {
this.messageIO = messageIO;
this.queue = queue;
}
/**
* Start a thread for reading messages. The thread reads
* a message, stores it in a blocking queue and notifies the main
* thread to process the message. The intention is to remove the
* processing load from the receiving thread so that time measurements could
* be more accurate.
*/
public void run() {
String line = null;
long initTime = TimingClass.getTime();
long diff = 0;
int messageAcc = 0;
int numberOfMsgs = 0;
long currentTime = 0;
/* Create initial queue element for storing message data */
BufferQueueElement bufferElement = new BufferQueueElement();
try {
while ((line = messageIO.readMessage()) != null) {
/* getTime returns time in milliseconds */
currentTime = TimingClass.getTime();
diff = currentTime - initTime;
/* Ok, we can close the stream, but process the last message
* before closing. */
if (line.equals("OKBYE")) {
bufferElement.setMsgAcc(messageAcc);
bufferElement.setMsgNumber(numberOfMsgs);
bufferElement.setCloseQueue(true);
queue.put(bufferElement);
break;
}
/* Count total number of messages */
messageAcc++;
/* Count received messages in 1s */
numberOfMsgs++;
bufferElement.addListElement(line, currentTime);
if (diff > 1000) {
bufferElement.setMsgAcc(messageAcc);
bufferElement.setMsgNumber(numberOfMsgs);
/* Notify the main thread to process the message. */
queue.put((bufferElement));
numberOfMsgs = 0;
bufferElement = new BufferQueueElement();
initTime = TimingClass.getTime();
}
}
} catch (IOException e) {
System.out.println(e.getMessage());
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
/**
* Object for storing message information.
* These values are extracted on message receival.
* The object stores a map of <Message, ArrivalTime>
* @author pord911
*
*/
public class BufferQueueElement {
private Map<String, Long> lineList;
private int msgAcc;
private int msgNumber;
private boolean closeQueue = false;
public BufferQueueElement() {
lineList = new LinkedHashMap<String, Long>();
}
public Map<String, Long> getLineList() {
return lineList;
}
public void addListElement(String line, Long time) {
lineList.put(line, time);
}
public int getMsgAcc() {
return msgAcc;
}
public void setMsgAcc(int msgAcc) {
this.msgAcc = msgAcc;
}
public int getMsgNumber() {
return msgNumber;
}
public void setMsgNumber(int msgNumber) {
this.msgNumber = msgNumber;
}
public boolean isCloseQueue() {
return closeQueue;
}
public void setCloseQueue(boolean closeQueue) {
this.closeQueue = closeQueue;
}
}
public class TimingClass {
/**
* Get current time from the EPOCH.
* @return Time in milliseconds.
*/
public static long getTime() {
return System.currentTimeMillis();
}
}
And here is the printout:
C:\workspace\TCPPing\bin>java -cp .;../lib/* com.tcpping.main.TCPPing -p -port 9900 -mps 100 -size 2000 127.0.0.1
18:42:49: Total=108 Rate=108/s AvgRTT=1,917ms MaxRTT=12ms A->B=1,463ms B->A=0,454ms
18:42:49: Total=216 Rate=108/s AvgRTT=1,259ms MaxRTT=5ms A->B=0,676ms B->A=0,583ms
18:42:49: Total=324 Rate=108/s AvgRTT=0,574ms MaxRTT=2ms A->B=0,380ms B->A=0,194ms
18:42:49: Total=427 Rate=103/s AvgRTT=0,544ms MaxRTT=1ms A->B=0,340ms B->A=0,204ms
18:42:49: Total=500 Rate=73/s AvgRTT=0,493ms MaxRTT=1ms A->B=0,356ms B->A=0,137ms
Messages sent: 500, Messages received: 500, Messages lost: 0
1 Answer 1
It is quite hard to review the code without all classes.
However if you want to separate the reading from processing, I would be tempted to remove the notification from your MessageReader
.
I would have designed this with a queue where the MessageReader
can continually push messages. On the other side, a MessageProcessor
could peek all messages from the queue.
Ideally, I will use a ScheduledExecutorService
to schedule the processor each seconds.
Another change that can be nice is to create a Message
class with the line
, counter
, number?
, .. and add some "business" methods like isClosure
to encapsulate the line.equals("OKBYE")
.
-
\$\begingroup\$ Yes, sorry for the things/classes which are not here. I will update the post. Regarding the continuous push of messages to the processor, seems ok for me, maybe storing the messages in a list and pushing every second is an overkill? Since I have to extract these messages in the processor, maybe better to do the 1s processing in the processor. But leave the 1s counting in the Reader thread ( diff = currentTime - initTime; ) and send the diff to the processor, since the blocking queue could add unwanted delay, if the time calculation happens in the processor? \$\endgroup\$pord911– pord9112017年01月25日 17:33:13 +00:00Commented Jan 25, 2017 at 17:33
-
\$\begingroup\$ Regarding the ScheduledExecutorService advice, that runs a task every second in a separate thread, if I'm not mistaken? I think it's better to do the processing and printout together in the main thread, since when the app starts it should print the messages and finish gracefully. My intention was to free the reading of messages of any unnecessary processing. If i.e. messages get dropped for some reason the main therad should block, just like a normal ping command would do, in this case it would be blocked waiting for the queue to get populated. I'll post how the printout looks. \$\endgroup\$pord911– pord9112017年01月25日 17:38:53 +00:00Commented Jan 25, 2017 at 17:38
-
1\$\begingroup\$ I'm not sure to catch your concerns about the delay and calculation time. But for my point of view (without all your context) it seems clear to me that you have two components, a
Reader
and aProcessor
that should works asynchronously. You can useBlockingQueue#drainTo
to collect all the messages accumulated between two processing. \$\endgroup\$gervais.b– gervais.b2017年01月26日 10:43:38 +00:00Commented Jan 26, 2017 at 10:43 -
\$\begingroup\$ The
ScheduledExecutor
runs in a separate thread but this is not a problem since theReader
is blocking. You first schedule theProcessor
, then start the reader that will finish on OKBYE and finally shutdown the executor. \$\endgroup\$gervais.b– gervais.b2017年01月26日 10:45:55 +00:00Commented Jan 26, 2017 at 10:45 -
1\$\begingroup\$ I made this to "explain" it : github.com/gervaisb/stackexchange-codereview/tree/master/src/… \$\endgroup\$gervais.b– gervais.b2017年01月30日 08:35:18 +00:00Commented Jan 30, 2017 at 8:35