General
#General II think you are missing following semantic to achieve what you want:
#Refactorings
Refactorings
#Code
Code
#General I think you are missing following semantic to achieve what you want:
#Refactorings
#Code
General
I think you are missing following semantic to achieve what you want:
Refactorings
Code
One thing I recognized afterwards. You have to make a copy of the hashmap and then pass it into the Message object. In general it is a good thing to make defensive copies and not pass datastructures through the system by reference.
One thing I recognized afterwards. You have to make a copy of the hashmap and then pass it into the Message object. In general it is a good thing to make defensive copies and not pass datastructures through the system by reference.
private void validateAndSend(final Partition partition) {
ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
int totalSize = 0;
boolean potentialMessagesToSend = true;
while (potentialMessagesToSend) {
DataHolder dataHolder = dataHolders.poll();
potentialMessagesToSend = dataHolder != null || !clientKeyBytesAndProcessBytesHolder.isEmpty();
Message message = null;
if (dataHolder != null) {
byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
if (clientKeyBytes.length <= 255) {
byte[] processBytes = dataHolder.getProcessBytes();
int clientKeyLength = clientKeyBytes.length;
int processBytesLength = processBytes.length;
int additionalLength = clientKeyLength + processBytesLength;
if (totalSize + additionalLength > 50000) {
message = new Message(partition, copy(clientKeyBytesAndProcessBytesHolder));
clientKeyBytesAndProcessBytesHolder.clear(); // watch out for gc
totalSize = 0;
}
clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes);
totalSize += additionalLength;
}
} if (!clientKeyBytesAndProcessBytesHolder.isEmpty()) {
message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
}
if (message != null) {
sendToDatabase(message.getAddress(), message.getLocation());
}
}
}
private void validateAndSend(final Partition partition) {
ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
int totalSize = 0;
boolean potentialMessagesToSend = true;
while (potentialMessagesToSend) {
DataHolder dataHolder = dataHolders.poll();
potentialMessagesToSend = dataHolder != null || !clientKeyBytesAndProcessBytesHolder.isEmpty();
Message message = null;
if (dataHolder != null) {
byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
if (clientKeyBytes.length <= 255) {
byte[] processBytes = dataHolder.getProcessBytes();
int clientKeyLength = clientKeyBytes.length;
int processBytesLength = processBytes.length;
int additionalLength = clientKeyLength + processBytesLength;
if (totalSize + additionalLength > 50000) {
message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
clientKeyBytesAndProcessBytesHolder.clear(); // watch out for gc
totalSize = 0;
}
clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes);
totalSize += additionalLength;
}
} if (!clientKeyBytesAndProcessBytesHolder.isEmpty()) {
message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
}
if (message != null) {
sendToDatabase(message.getAddress(), message.getLocation());
}
}
}
private void validateAndSend(final Partition partition) {
ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
int totalSize = 0;
boolean potentialMessagesToSend = true;
while (potentialMessagesToSend) {
DataHolder dataHolder = dataHolders.poll();
potentialMessagesToSend = dataHolder != null || !clientKeyBytesAndProcessBytesHolder.isEmpty();
Message message = null;
if (dataHolder != null) {
byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
if (clientKeyBytes.length <= 255) {
byte[] processBytes = dataHolder.getProcessBytes();
int clientKeyLength = clientKeyBytes.length;
int processBytesLength = processBytes.length;
int additionalLength = clientKeyLength + processBytesLength;
if (totalSize + additionalLength > 50000) {
message = new Message(partition, copy(clientKeyBytesAndProcessBytesHolder));
clientKeyBytesAndProcessBytesHolder.clear(); // watch out for gc
totalSize = 0;
}
clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes);
totalSize += additionalLength;
}
} if (!clientKeyBytesAndProcessBytesHolder.isEmpty()) {
message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
}
if (message != null) {
sendToDatabase(message.getAddress(), message.getLocation());
}
}
}