I have a class that implements Queue and draws values from other queues which may still be referenced outwith it. I want my method to draw values from the contained queues, using synchronized locks on them to ensure thread safety with other code that uses synchronized locks on the queues.
The way I've tried to achieve this is by having my method loop through all values indefinitely, storing the value if it's the next one, updating the stored values if it reaches the same queue again and the variables the queues are being measured by have changed since the list iteration, and returning the next value of the queue if it's unchanged since the last time it was checked and evaluated to have the next value - my logic being that at that point, all queues have been checked and the currently stored value/queue was checked earliest and has been reconfirmed to be the next value.
Is this the ideal way to create a thread-safe version of this method? Or would there be a better way?
Collection<Queue<T>> memberQueues;
final Lambda<T, Comparable> keyGetter;
public T get(boolean remove)
{
// workaround to ensure thread-safety when synchronized locks can't extend past the block they're declared in.
// Work on a copy of memberQueues
List<Queue<T>> memberQueuesCopy;
synchronized(memberQueues)
{ memberQueuesCopy = new ArrayList<Queue<T>>(memberQueues); }
// Declare variables and initialise with last member of memberQueuesCopy.
// If it checks every variable and the last one is next, then I don't think I need to check again.
Queue<T> nextQueue = memberQueuesCopy.get(memberQueuesCopy.size() - 1);
T nextValue = nextQueue.peek();
Comparable nextValueComparable = keyGetter.getMember(nextValue);
// Check all members of memberQueuesCopy. Find the lowest and hold the value until it gets to it again incase
// any values have changed. When it gets back to the current lowest value, check whether the value it's being
// sorted by has changed - and if it has, use its new values and run through all members again. If it hasn't,
// return it.
// I want a less contrived method. Suggestions that maintain thread safety?
for(;;)
{
for(Queue<T> i : memberQueuesCopy)
{
synchronized(i)
{
T iValue = i.peek();
Comparable iComparable = keyGetter.getMember(iValue);
if(i == nextQueue)
{
if(nextValue == iValue && nextValueComparable.equals(iComparable))
{
if(remove)
return nextQueue.remove();
else
return iValue;
}
nextValue = nextQueue.peek();
nextValueComparable = keyGetter.getMember(nextValue);
}
else
{
if(iComparable.compareTo(nextValueComparable) < 0)
{
nextQueue = i;
nextValue = iValue;
nextValueComparable = iComparable;
}
}
}
}
}
}
-
1\$\begingroup\$ Could you please clarify what you mean when you say: I want the method to draw values from the queues by checking over all values indefinitely and after a value is checked twice. I am not sure I understand that. \$\endgroup\$rolfl– rolfl2014年07月03日 23:25:24 +00:00Commented Jul 3, 2014 at 23:25
-
\$\begingroup\$ Yes, please take a few minutes and rewrite your first paragraph to explain clearly what your goal is. It would even be better if you explained the larger context in which this will be used. It seems very complex: I don't understand your explanation or your code. I'm sure there is a much simpler solution, which most likely exists in java.util.concurrent. \$\endgroup\$toto2– toto22014年07月04日 00:26:04 +00:00Commented Jul 4, 2014 at 0:26
3 Answers 3
Code Style
Java Code Style puts the open-brace at the end of the line, not the start of the next line. For example, you have:
if(i == nextQueue) {
but that should be:
if(i == nextQueue) {
Variable conventions
i
as a variable name is a great idea, if the variable is the control integer in a for
loop. In your case, I presume it is short for 'item', or something, but, a Queue, being called i
is unconventional.
As it happens, the letter q
is perfect as a substitute....
Now, your nextQueue
variable is actually the lastQueue
odd.
Function extraction
With synchronization, return-balues from methods are often a great help for readibility. Consider this code you have:
// Work on a copy of memberQueues List<Queue<T>> memberQueuesCopy; synchronized(memberQueues) { memberQueuesCopy = new ArrayList<Queue<T>>(memberQueues); }
Which should really be written as:
// Work on a copy of memberQueues
List<Queue<T>> memberQueuesCopy;
synchronized(memberQueues) {
memberQueuesCopy = new ArrayList<Queue<T>>(memberQueues);
}
would be even better if written as:
private final List<Queue<T>> copyQueues() {
synchronized(memberQueues) {
return new ArrayList<Queue<T>>(memberQueues);
}
}
and then:
// Work on a copy of memberQueues
List<Queue<T>> memberQueuesCopy = copyQueues();
Bugs
There are three bugs I should point out:
NoSuchElementException if
memberQeues
is empty:Queue nextQueue = memberQueuesCopy.get(memberQueuesCopy.size() - 1);
(and bug 3) NullPointerException if any of the queues are empty (in some combinations) (one bug on
iComparable
, the other onnextValueComparable
):T nextValue = nextQueue.peek(); Comparable nextValueComparable = keyGetter.getMember(nextValue); .... T iValue = i.peek(); Comparable iComparable = keyGetter.getMember(iValue); .... if(nextValue == iValue && nextValueComparable.equals(iComparable)) .... if(iComparable.compareTo(nextValueComparable) < 0)
-
1\$\begingroup\$ I guess the third bu \$\endgroup\$palacsint– palacsint2014年07月04日 07:39:46 +00:00Commented Jul 4, 2014 at 7:39
-
2\$\begingroup\$ @palacsint - what was I thinking? I jus \$\endgroup\$rolfl– rolfl2014年07月04日 14:21:58 +00:00Commented Jul 4, 2014 at 14:21
-
\$\begingroup\$ Thankyou for the in-depth reply :D I put braces on the next line in my own code because I find it more easily readable. I've always used i, j, k, etc. as the index in for loops and this habit just carried over to for-each loops. I did check for an empty queues collection and empty queues in the previous version of the method, I just must have forgotten to carry them to this version. Can I draw from the fact you didn't comment on the algorithm that there isn't a better way to achieve this? \$\endgroup\$Hanii Puppy– Hanii Puppy2014年07月04日 14:24:39 +00:00Commented Jul 4, 2014 at 14:24
-
\$\begingroup\$ @HaniiPuppy - no, you cannot make that assessment that it is doing it the best way. Frankly, I was, and still am, uncertain about the purpose of the code. I added a comment to your question an hour or so before I answered, and I only covered the things I was sure of. The
for(;;)
is a really concerning pattern, but I can't think of a better way because I don't understand the bigger context of what you are trying to do. There are more problems.... \$\endgroup\$rolfl– rolfl2014年07月04日 14:32:16 +00:00Commented Jul 4, 2014 at 14:32 -
\$\begingroup\$ I've rewritten the opening paragraph. EDIT: the for(;;) is a placeholder atm, I'll include a thresh-hold for the number of times it can loop through before I've established that something's wrong and it throws an exception, but atm, I'm more concerned with the algorithm. \$\endgroup\$Hanii Puppy– Hanii Puppy2014年07月04日 14:34:14 +00:00Commented Jul 4, 2014 at 14:34
Without the larger context it's hard to know whether having one queue reading from other queues is really the best design.
Some possible ideas, which I can't choose between without further clarification:
- Eliminate the smaller queues and just make one big master queue. If other parts of the system need to read from it, that business logic can be handled before objects are put on the queue or after they are taken off.
- Rather than having one master queue, have a
java.util.Map
whose values are queues. You can useThread.wait
andThread.notify
to notify listeners that a new element is available, or use a more explicit listener pattern if that's needed. - As @toto2 said, there may be something in
java.util.concurrent
that is the best fit. In particularjava.util.concurrent.locks.ReadWriteLock
may be useful.
The Producer/Consumer threads using a Queue design pattern may also be worth checking out.
I still don't quite understand what you are trying to accomplish, even after you short re-write. If you could please describe the full context of this problem.
My general advice when I see someone using synchronized
, wait/notify
or Object.lock()
is to tell them to stop using those primitive concurrency constructs and instead make good use of the modern patterns in java.util.concurrent.
In your case, it seems -- without fully understanding -- that you should transfer all incoming data from many queues into one BlockingQueue. Note that since this queue is blocking you don't need the infinite for {;;}
loop.
More advanced, but perhaps even better suited to your problem -- which I don't fully understand -- would be to use "reactive programming", as in the RxJava library. In that paradigm, you handle streams (Observable
s) of events. In your case you would just merge
various streams of events.