I have a method which takes a parameter which is Partition
enum. This method will be called by multiple background threads around same time period by passing different value of partition
. Here dataHoldersByPartition
is a map of Partition
and ConcurrentLinkedQueue<DataHolder>
.
private void validateAndSend(final Partition partition) {
ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
int totalSize = 0;
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null) {
byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
if (clientKeyBytes.length > 255)
continue;
byte[] processBytes = dataHolder.getProcessBytes();
int clientKeyLength = clientKeyBytes.length;
int processBytesLength = processBytes.length;
int additionalLength = clientKeyLength + processBytesLength;
if (totalSize + additionalLength > 50000) {
Message message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
sendToDatabase(message.getAddress(), message.getLocation());
clientKeyBytesAndProcessBytesHolder.clear(); // watch out for gc
totalSize = 0;
}
clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes);
totalSize += additionalLength;
}
// calling again with remaining values only if clientKeyBytesAndProcessBytesHolder is not empty
if(!clientKeyBytesAndProcessBytesHolder.isEmpty()) {
Message message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
sendToDatabase(message.getAddress(), message.getLocation());
}
}
In the method, I will iterate dataHolders
CLQ and I will extract clientKeyBytes
and processBytes
from it. Here is the validation that I am supposed to do:
- If the
clientKeyBytes
length is greater than 255 then I will skip it and continue iterating. - I will keep incrementing the
totalSize
variable which will be the sum ofclientKeyLength
andprocessBytesLength
, and thistotalSize
length should always be less than 50000 bytes. - As soon as it reaches the 50000 limit, I will send the
clientKeyBytesAndProcessBytesHolder
map to thesendToDatabase
method and clear out the map, resettotalSize
to 0 and start populating again. - If it doesn't reaches that limit and
dataHolders
got empty, then it will send whatever it has.
Basically, what I have to make sure is whenever the sendToDatabase
method is called, the clientKeyBytesAndProcessBytesHolder
map should have size less than 50000 (sum of all keys and values length). It should never be called with the size greater than 50000 (50k).
Question:
My code works fine, but as you can see, I am calling the sendToDatabase
method at two places: one is as soon as it is reaching the 50k
limit, and the other is it didn't reach because we ran out of elements.
Is there any way by which I can call the sendToDatabase
method just once and still fulfill both my conditions by rewriting it in a better way? I am working with Java 7.
-
\$\begingroup\$ You have two points in your programm-flow where to call this method so i don't see a reason or a way to call this once AND optimize code, you might add some complex if/else constructs into your loop but this is not what its for i think. Maybe use two loops, break the inner one when 50k are reached and check in outer loop why you canceled to decide if you want to keep on looping or not - does not sound better. In case its all about not writing the two lines ´Message ... new Message ...´ and ´send´ doubled you might use encapsulation \$\endgroup\$SchreiberLex– SchreiberLex2017年02月14日 10:22:24 +00:00Commented Feb 14, 2017 at 10:22
2 Answers 2
I waited a few days to get other answers to your questions. now I feel I can add my 2 cents.
First off, the obvious: This is one single method. It does:
- Polling a queue
- Serialising a message into a byte[]
- Validating the messages
- Packaging in 50Kb packages
- Sending chunks to a Database
The obvious thing to do is split those actions into individual methods, and move the right methods into the right objects which allow you to use a nicer syntax (right now, too much is in the current single method of the current objec, so your hands are tied).
Our goal is to make your code more readable, like this:
public void batchSend(final Partition partition) {
Queue<DataHolder> dataHolderQueue = dataHoldersByPartition.get(partition);
DataHolder data = dataHolderQueue.poll();
Message message = new Message();
while (data != null) {
if(data.isValid()){
if(message.hasRoomFor(data)){
message.append(dataHolder);
} else {
// Message is at maximum capacity, send it
database.send(message);
message = new Message();
}
}
data = dataHolderQueue.poll();
}
if(!message.isEmpty()) {
database.send(message); // Queue is empty, sending remaining message
}
}
I simply modified the Message
Object, which now holds a bunch of DataHolder's serialized data and tells you when you reach a certain size limit. I don't really understand how you concatenate the messages together (it's all hidden in the Message class, which was not included) so most of the below is just a guess:
public class Message {
public static final int MAX_BYTES_PER_MESSAGE = 50000;
private final Map<byte[], byte[]> data = new HashMap<>();
private int size = 0;
... Other fields I don't know about ...
public boolean hasRoomFor(DataHolder holder){
return size + holder.getSizeIncludingHeader() <= 50000;
}
public void append(DataHolder holder){
byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
byte[] processBytes = dataHolder.getProcessBytes();
size += clientKeyBytes.length + processBytes.length;
// Maybe throw InvalidOperationException is size > 50000 ?
data.put(clientKeyBytes, processBytes);
}
public boolean isEmpty(){
return map.isEmpty();
}
// provide getters on the data field for sendTodatabase() method
... Other methods I don't know about ...
}
Also modified the DataHolder
Object to add:
public boolean dataHolder.isValid(){
return dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8) <= 255;
}
Indeed, its validity seem to be an intrinsic property of a DataHolder
. By this I mean a DataHolder either is valid, or isn't, but it does not depend on external factors. This is a perfect candidate to be made a method of the DataHolder
Object.
Weird message usage
This is weird:
Message message = new Message(partition, clientKeyBytesAndProcessBytesHolder); sendToDatabase(message.getAddress(), message.getLocation());
You're creating an instance of message, but not using the instance, only some of its fields, via getters. The point of using a Message Object is to keep its constitutive elements consistent inside, so those elements are only consistent if using the instance.
By allowing sendToDatabase
to have two split parameter, you're allowing the caller to pass two inconsistent parameters:
sendToDatabase(anAddress, aTotallyUnrelatedLocation);
The obvious fix is to pass in the Message Object entirely, the method will just extract the right data:
boolean sendToDatabase(Message message){
Address address = message.getAddress();
Location location = message.getLocation();
... do Stuff
return successStatus;
}
It is additionally syntaxically more pleasing to send a Message rather than an address and a location.
By the way, why would the message body not be sent as well? This is confusing. I would have expected sendToDatabase(Message message, Address address)
at least. Maybe the Location is the Message body? It does not make sense to me.
GC Handling
This should never be written:
clientKeyBytesAndProcessBytesHolder.clear(); // watch out for gc
The GC knows his stuff. Don't 'help' him. It knows if the collection needs to be GC'ed, or if the data in it does. This call doesn't even make the memory available!
If the collection must be cleared, it must be for business reasons, not because of GC.
Also be careful, if you clear()
it, but the sendToDatabase() spawned a Thread which holds onto the reference a bit longer, you've just deleted all data before it was sent.
The best would be to overwrite the reference to the collection with a new one:
clientKeyBytesAndProcessBytesHolder = new HashMap<>();
With this, you signal to the GC that it can get rid of the Collection (you no longer keep a handle), so when no one else holds a reference either, it will be recycled.
Late validation
I wonder why you validate()
your messages so late. You only validate them just before sending them. You could do this check earlier, for example when the message is sent to the queues.
This is the "Fail Fast" principle: Don't keep corrupt data in your system, flush them as soon as you can.
Edit:
One shortcoming of this code is I might perform several dataholder-to-byte conversions. If those are not complex, it's fine. If they are, here is a Workaround:
Create a ValidatedRawData Object wich contains just the raw byte data.
It would be created by the DataHolder itself in a re-write of validate() method: so:
public class DataHolder {
... stuff ...
/** Now returns null if fails to validate */
public ValidatedRawData validate(){
byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
if (clientKeyBytes.length > 255)
return null;
byte[] processBytes = dataHolder.getProcessBytes();
return new ValidatedRawData(clientKeyBytes, processBytes);
}
}
ValidatedRawData
is just a holder, with Getters.
-
\$\begingroup\$ Thanks again for great code review. I have few confusions. This is how my Message and DataHolder class looks like with my current code in my question. First contructor takes two parameter and other constructor takes one parameter. After calling first constructor with two parameters which I am currently doing in my question, I call
serialize()
method to serialize everything into one byte array and then with other consturctor which takes only byte array, you can deserialize everything to get individual fields. \$\endgroup\$david– david2017年02月16日 04:39:23 +00:00Commented Feb 16, 2017 at 4:39 -
\$\begingroup\$ Now since I have all final fields in
Message
class, I am not sure I can have default no-arg constructor as you have mentioned in your answer so that is where I am confuse. And maybe after you look into my code forMessage
andDataHolder
you can help me figure out what's the best way to do this. I can addisValid()
method inDataHolder
for sure to do 255 length validations but mainly I am confuse forMessage
class. \$\endgroup\$david– david2017年02月16日 04:39:33 +00:00Commented Feb 16, 2017 at 4:39 -
\$\begingroup\$ Don't take this review so literally. I don't have your entire code, so I'm making stuff up as I go to fill the gaps. Is having a default Ctor a requirement for anything I mentioned ? No, I just didn't bother writing boiler plate code, and I didn't have the original class. Feel free to adapt anything I say to your liking, such as adding a no-args Ctor or not. It's not so much about making a better code as it is about understanding how to get there. That is why there's less code than text in my answers. \$\endgroup\$MrBrushy– MrBrushy2017年02月16日 07:49:05 +00:00Commented Feb 16, 2017 at 7:49
-
\$\begingroup\$ What you should be thinking about is for example "why do I just need a Location and an Address to construct a Message?" A mail requires an Address and a Content for example. Same for an email. So either one of these args is not named correctly or the message class is not a massage, so something is off. Also, when you think about it, you serialise DataHolder to
byte[]
, add the bytes in a map in a class, then serialise that class again. That's two layers of serialization! Take a higher level approach to this feedback: constructors don't matter, what you're doing does. \$\endgroup\$MrBrushy– MrBrushy2017年02月16日 07:56:36 +00:00Commented Feb 16, 2017 at 7:56 -
\$\begingroup\$ When I posted this question, I was not aware that
Message
andDataHolder
class was needed but now I realized it is needed indeed. Yes you are rightor the message class is not a massage
, class names are little bit off. I will fix that on my side to make more clear.. I am always bad in naming stuff. And I understood your approach but the confusion I have is how to adapt myMessage
class to do same thing as you suggested in your answer or adapt yourMessage
class and put my stuff in it. \$\endgroup\$david– david2017年02月16日 08:49:34 +00:00Commented Feb 16, 2017 at 8:49
General
I think you are missing following semantic to achieve what you want:
Are there potential messages to send?
This can be represented by:
potentialMessagesToSend = dataHolder != null || !clientKeyBytesAndProcessBytesHolder.isEmpty()
If you are missing to represent semantics as structure you will have to deal with it with algorithmic as you did.
Refactorings
I omit the "continue" and reformulate (negate) the if-statement as it hindered me to apply the refactorings.
I introduced the "guard"-variable "potentialMessagesToSend" and initialized it to true.
I abstracted from which path a Message was created. If a Message is created it will be sent no matter from where it came.
Code
I do not know if I got it totally right, but you can get the idea:
private void validateAndSend(final Partition partition) {
ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
int totalSize = 0;
boolean potentialMessagesToSend = true;
while (potentialMessagesToSend) {
DataHolder dataHolder = dataHolders.poll();
potentialMessagesToSend = dataHolder != null || !clientKeyBytesAndProcessBytesHolder.isEmpty();
Message message = null;
if (dataHolder != null) {
byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
if (clientKeyBytes.length <= 255) {
byte[] processBytes = dataHolder.getProcessBytes();
int clientKeyLength = clientKeyBytes.length;
int processBytesLength = processBytes.length;
int additionalLength = clientKeyLength + processBytesLength;
if (totalSize + additionalLength > 50000) {
message = new Message(partition, copy(clientKeyBytesAndProcessBytesHolder));
clientKeyBytesAndProcessBytesHolder.clear(); // watch out for gc
totalSize = 0;
}
clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes);
totalSize += additionalLength;
}
} if (!clientKeyBytesAndProcessBytesHolder.isEmpty()) {
message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
}
if (message != null) {
sendToDatabase(message.getAddress(), message.getLocation());
}
}
}
One thing I recognized afterwards. You have to make a copy of the hashmap and then pass it into the Message object. In general it is a good thing to make defensive copies and not pass datastructures through the system by reference.
-
\$\begingroup\$ I guess you missed one part - As soon as it reaches 50k limit, I want to send data to database but in your example, it's not there or maybe I have missed something? \$\endgroup\$david– david2017年02月14日 16:31:34 +00:00Commented Feb 14, 2017 at 16:31
-
\$\begingroup\$ As soon As soon as it reaches 50k limit the Message is created. The last three lines within the loop will then send the Message because it is there. \$\endgroup\$oopexpert– oopexpert2017年02月14日 16:41:45 +00:00Commented Feb 14, 2017 at 16:41
-
\$\begingroup\$ I am still not getting it... As soon as it reaches the limit, how do you send it right away? That part I am not able to understand.. Because in your case, I see once it reaches the limit you will go back and start polling again and populate
clientKeyBytesAndProcessBytesHolder
map which will be more than 50k now? \$\endgroup\$david– david2017年02月14日 16:48:13 +00:00Commented Feb 14, 2017 at 16:48 -
\$\begingroup\$ Did you recognize that your previous while loop condition is now used in an if statement? The while loop is around all. \$\endgroup\$oopexpert– oopexpert2017年02月14日 17:40:40 +00:00Commented Feb 14, 2017 at 17:40
Explore related questions
See similar questions with these tags.