I have bunch of keys and values that I want to send to our messaging queue by packing them in one byte array. I will make one byte array of all the keys and values which should always be less than 50K and then send to our messaging queue.
Packet class:
public final class Packet implements Closeable {
private static final int MAX_SIZE = 50000;
private static final int HEADER_SIZE = 36;
private final byte dataCenter;
private final byte recordVersion;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte recordsPartition;
private final byte replicated;
private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
private int pendingItems = 0;
public Packet(final RecordPartition recordPartition) {
this.recordsPartition = (byte) recordPartition.getPartition();
this.dataCenter = Utils.LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
final long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
}
private void addHeader(final ByteBuffer buffer, final int items) {
buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
.putLong(address).putLong(addressFrom).putLong(addressOrigin).put(recordsPartition)
.put(replicated);
}
private void sendData() {
if (itemBuffer.position() == 0) {
// no data to be sent
return;
}
final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
addHeader(buffer, pendingItems);
buffer.put(itemBuffer);
SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
// SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
// SendRecord.getInstance().sendToQueueSync(address, buffer.array());
// SendRecord.getInstance().sendToQueueSync(address, buffer.array(), socket);
itemBuffer.clear();
pendingItems = 0;
}
public void addAndSendJunked(final byte[] key, final byte[] data) {
if (key.length > 255) {
return;
}
final byte keyLength = (byte) key.length;
final byte dataLength = (byte) data.length;
final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
final int newSize = itemBuffer.position() + additionalSize;
if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
sendData();
}
if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
throw new AppConfigurationException("Size of single item exceeds maximum size");
}
final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
// data layout
itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
.put(data);
pendingItems++;
}
@Override
public void close() {
if (pendingItems > 0) {
sendData();
}
}
}
Below is the way I am sending data. As of now my design only permits to send data asynchronously by calling sender.sendToQueueAsync
method in sendData()
method.
private void validateAndSend(final RecordPartition partition) {
final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
final Packet packet = new Packet(partition);
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null) {
packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
dataHolder.getProcessBytes());
}
packet.close();
}
Now I need to extend my design so that I can send data in three different ways. It is upto user to decide which way he wants to send data to either "sync" or "async".
- I need to send data asynchronously by calling
sender.sendToQueueAsync
method. - or I need to send data synchronously by calling
sender.sendToQueueSync
method. - or I need to send data synchronously but on a particular socket by calling
sender.sendToQueueSync
method. In this case I need to passsocket
variable somehow so thatsendData
knows about this variable.
SendRecord class:
public class SendRecord {
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100).build();
private static class Holder {
private static final SendRecord INSTANCE = new SendRecord();
}
public static SendRecord getInstance() {
return Holder.INSTANCE;
}
private SendRecord() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
handleRetry();
}
}, 0, 1, TimeUnit.SECONDS);
}
private void handleRetry() {
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage message : messages) {
if (message.hasExpired()) {
if (message.shouldRetry()) {
message.markResent();
doSendAsync(message);
} else {
cache.invalidate(message.getAddress());
}
}
}
}
// called by multiple threads concurrently
public boolean sendToQueueAsync(final long address, final byte[] encodedRecords) {
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m);
}
// called by above method and also by handleRetry method
private boolean doSendAsync(final PendingMessage pendingMessage) {
Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// this returns instantly
return msg.send(liveSocket.get().getSocket());
} finally {
msg.destroy();
}
}
// called by send method below
private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// this returns instantly
return msg.send(socket);
} finally {
msg.destroy();
}
}
// called by multiple threads to send data synchronously without passing socket
public boolean sendToQueueSync(final long address, final byte[] encodedRecords) {
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try {
if (doSendAsync(m)) {
return m.waitForAck();
}
return false;
} finally {
cache.invalidate(address);
}
}
// called by a threads to send data synchronously but with socket as the parameter
public boolean sendToQueueSync(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
} finally {
cache.invalidate(address);
}
}
public void handleAckReceived(final long address) {
PendingMessage record = cache.getIfPresent(address);
if (record != null) {
record.ackReceived();
cache.invalidate(address);
}
}
}
Callers will only call either of below three methods:
- sendToQueueAsync by passing two parameters
- sendToQueueSync by passing two parameters
- sendToQueueSync by passing three parameters
How should I design my Packet
and SendRecord
class so that I can tell Packet
class that this data needs to be send in either of above three ways to my messaging queue. It is upto user to decide which way he wants to send data to messaging queue. As of now the way my Packet
class is structured, it can send data only in one way.
3 Answers 3
Your main problem is the SendRecord.getInstance()...
calls in sendData()
. My recommendation is to abstract that out as a policy. A sketch of a minimal change approach follows.
Start by creating a QueuePolicy
interface
public interface QueuePolicy {
public boolean sendToQueue(final long address, final byte[] encodedRecords);
}
Pass this to the Packet
constructor public Packet(final RecordPartition recordPartition, final QueuePolicy qPolicy)
, and store in an instance variable i.e. this.qPolicy = qPolicy;
.
Replace the SendRecord.getInstance()...
call in sendData()
with
qPolicy.sendToQueue(address, buffer.array());
Now, in validateAndSend(...)
, construct an object that implements the interface and pass that to the Packet
constructor. If your client doesn't have access to validateAndSend(...)
then you'll need to add a QueuePolicy
parameter to validateAndSend
and have the client pass it in.
An example class that implements the QueuePolicy
is below. The other two variants should follow the same pattern (but without the Socket
instance);
public class QPolicyAsyncWithSocket implements QueuePolicy {
private final Socket socket;
public QPolicyAsyncWithSocket (Socket socket) {
this.socket = socket;
}
public boolean sendToQueue(final long address, final byte[] encodedRecords) {
return SendRecord.getInstance().sendToQueueSync(address, encodedRecords, socket);
}
}
-
I liked your idea of passing the implementation of
QueuePolicy
interface toPacket
constructor and then we can call directlysendToQueue
method on that implementation. That makes sense. Now I am thinking more on this, I have three implementations ofQueuePolicy
and from all those implementations I am just calling corresponding method ofSendRecord
class. Right? So can we not get rid of "SendRecord" class altogether and have them implemented in these three implementations ofQueuePolicy
interface somehow by making sure all the logic of "SendRecord" class is there?user1950349– user195034901/16/2018 21:36:11Commented Jan 16, 2018 at 21:36 -
@user1950349 Possibly. It’s not clear from the context why SendRecord is a singleton. If the client will always pick one policy, that you’ll reuse, or it’s actually fine to have multiple queues then this should work ok. If not, or you’re not sure, it may be best to wrap the singleton.Alex– Alex01/16/2018 22:01:00Commented Jan 16, 2018 at 22:01
-
Idea is - sometimes we have to send data through "async" and sometimes we can send data through "sync" so it's up to clients to decide how they wants to do it. That's why I was thinking to have a base abstract class kind of thing. I am trying to do it this way now but stuck as of now on the common implementations for them.user1950349– user195034901/16/2018 22:06:03Commented Jan 16, 2018 at 22:06
-
@user1950349 I understand but the SendRecord singleton looks like it was written to ensure that there’s only one thread and one cache. If you move away from that model, by moving the implementation to the QueuePolicy concrete classes, you’ve probably got some complex syncing problems. TBH the SendRecord class looks reasonably sensible. Unless you have a strong reason to remove it, I’d be inclined to leave it. Up to you of course.Alex– Alex01/16/2018 22:52:56Commented Jan 16, 2018 at 22:52
-
I have a quick question on this. With your suggestion, I can pass queue policy implementation in the constructor of
Packet
class and then call that implementation to send data. So how can I know whether data was successfully sent or not? Because my methods inPacket
class doesn't return a boolean and they are just void. For example: let's saydataHolders
only has one element in it so when we callclose
method of packet class, then only it will send data so do I need to evaluate boolean values both fromaddAndSendJunked
andclose
method?user1950349– user195034903/06/2018 01:29:09Commented Mar 6, 2018 at 1:29
Consider using FutureTask to simplify your interface. A FutureTask
instance is nothing more than a task that once run will return a result. The Executor
classes play well with them too, because they are just an extension of a Runnable
class.
Your SendRecord
class can therefore by simplified into sendToQueue
method accepting two parameters and returning a FutureTask
. You then need only append to your Executor
to manage the execution of these tasks in the manner of your choosing. You can also chain future tasks together should the result of one be required by a successive future task should you wish it.
The task performed by FutureTask
would then therefore be responsible for:
- Performing sendoff of packet
- Await response until timeout.
- If response not received or if response is failure:
- If retry count is zero, set result to failure.
- Otherwise decrement retry count, and return to step 1.
- Otherwise response received and success:
- Set result to success.
To simply perform a synchronous call, you'd need only to call get()
to force the task to resolve to success or failure, though ideally you should avoid forcing resolution of the task as long as possible, as it allows you to work in parallel for as long as possible.
In doing it this way, you have full flexibility and none of the clunky mixing of synchronous and asynchronous implementations, while still maintaining full control over when these tasks resolve. The actual sending of the packet is also entirely kept within its own world making adjustments far more straightforward. It would be easy to see how you could adapt a FutureTask
instance to chain with other calls as well.
public class PacketChain extends FutureTask<PacketResponse> {
private FutureTask<PacketResponse> task;
private PacketChain next = null;
public PacketChain(FutureTask<PacketResponse> task) {
super(task, null);
this.task = task;
}
public PacketChain add(FutureTask<PacketResponse> nextFutureTask) {
next = new PacketChain(nextFutureTask);
return next;
}
@Override
public void run() {
try {
PacketResponse response = task.get();
set(response);
if(next != null && ResponseStatus.OK.equals(response.getStatus())) {
// Launch successive packet in same thread (synchronously)
set(next.get());
}
} catch (InterruptedException | ExecutionException e) {
// This will ensure interruption or other exceptions will bubble up
setException(e);
}
}
}
To use it, you'd simply have to do the following to add successive tasks to the first packet:
PacketChain task = new PacketChain(firstPacketTask);
task.add(secondPacketTask)
.add(thirdPacketTask);
Perhaps your SendRecord
class could then focus on splitting the information into pieces and returning a single PacketChain
representing the cumulation of all calls necessary to send the information required.
Since you asked it here and not on stackexchange. Maybe you went in a wrong design, should users need to decide if your program sends sync or async, your sure ?. Decisions like that should not be made by users, they should be idiot proven to work in all situations. your program should discover itself what's best, reducing the number of user interface options.
Explore related questions
See similar questions with these tags.
sendToQueueAsync
method but wanted to see how can I extend my design so that I can send data in either of those three ways.