The code below was written for an interview question of designing a load balancer with only a register()
method.
import java.util.HashSet;
import java.util.Set;
class LoadBalancer {
private int maxAddresses;
Set<Server> servers = new HashSet<>();
LoadBalancer(int maxAddresses) {
this.maxAddresses = maxAddresses;
}
public synchronized boolean register(Server server) {
if (servers.contains(server)) {
return false;
}
if (servers.size() == maxAddresses) {
return false;
}
return servers.add(server);
}
}
I believe this is the thread-safe simple load balancer (just the register
function of it). Essentially, only one thread can execute register
function at a time.
What if we have a frequent register
calls. How we can improve current solution, so multiple threads can call register
concurrently without blocking for different servers?
I tried to use a ConcurrentHashMap
but can't make it thread-safe, that's where I stopped:
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
class LoadBalancer {
private int maxAddresses;
Map<Server, Boolean> servers = new ConcurrentHashMap<>();
LoadBalancer(int maxAddresses) {
this.maxAddresses = maxAddresses;
}
public boolean register(Server server) {
return servers.compute(server, (key, val) -> {
if (servers.size() == maxAddresses) {
return false;
}
if (servers.containsKey(key)) {
return false;
}
return true;
});
}
}
Server
class:
class Server {
private String serverUrl;
Server(String serverUrl) {
this.serverUrl = serverUrl;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((serverUrl == null) ? 0 : serverUrl.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Server other = (Server) obj;
if (serverUrl == null) {
if (other.serverUrl != null)
return false;
} else if (!serverUrl.equals(other.serverUrl))
return false;
return true;
};
}
(note hashCode()
and equals()
are auto-generated, please, skip reviewing it in details)
Unit tests for clarification of API:
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
/**
* Unit test for simple App.
*/
public class AppTest {
@Test
public void shouldRegisterInstanceByAddress() {
int maxAddress = 10;
LoadBalancer loadBalancer = new LoadBalancer(maxAddress);
Server expected = new Server("127.0.0.1");
boolean registered = loadBalancer.register(expected);
assertTrue(registered);
}
@Test
public void shouldNotRegisterMoreThanMax() {
LoadBalancer loadBalancer = new LoadBalancer(1);
Server server1 = new Server("127.0.0.1");
Server server2 = new Server("127.0.0.2");
boolean registered1 = loadBalancer.register(server1);
boolean registered2 = loadBalancer.register(server2);
assertTrue(registered1);
assertFalse(registered2);
}
@Test
public void shouldNotRegisterTheSameAddress() {
LoadBalancer loadBalancer = new LoadBalancer(10);
Server server1 = new Server("127.0.0.1");
boolean registered1 = loadBalancer.register(server1);
boolean registered2 = loadBalancer.register(server1);
assertTrue(registered1);
assertFalse(registered2);
}
@Test
public void testRaceConditionOnRegister() throws InterruptedException {
final int maxAddresses = 10;
final LoadBalancer loadBalancer = new LoadBalancer(maxAddresses);
final int totalThreads = 100;
ExecutorService executor = Executors.newFixedThreadPool(totalThreads);
for (int i = 0; i < totalThreads; i++) {
final int serverId = i;
executor.submit(() -> {
loadBalancer.register(new Server("127.0.0." + serverId));
});
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
int registeredCount = loadBalancer.servers.size();
assertEquals(maxAddresses, registeredCount, "More servers registered than the allowed maximum.");
}
}
2 Answers 2
Version 1
In the first version, you synchronized the whole method register()
.
As a result, when there's a thread executing it, all threads attempting to call this method will be blocked. But it works correctly.
By the way, register()
implementation can be simplified to:
public synchronized boolean register(Server server) {
if (servers.size() == maxAddresses) {
return false;
}
return servers.add(server);
}
contains()
check is redundant.
Version 2
In contrast to the second implementation, which has several bugs in it.
This implementation is broken even in a single-threaded environment.
Broken conditions
Every attempt to add a new unique sever will succeed, even if a map is being updated in a single thread.
public boolean register(Server server) {
return servers.compute(server, (key, val) -> {
if (servers.size() == maxAddresses) {
return false;
}
if (servers.containsKey(key)) {
return false;
}
return true;
});
}
The code block inside the lambda creates an impression that you might have thought that return
statements inside the lambda expression return the specified boolean value from the register()
method, if so, that's not how lambda expressions work. Each return
the lambda above produces a value to be associated with a new key, or a replacement value for an existing key.
This condition is not guarding against exceeding the limit
if (servers.size() == maxAddresses) {
return false;
}
When servers.size() == maxAddresses
evaluated to true
, i.e. when the limit is reached, an attempt to add a new server (which is not present) will result in a new map entry [Sever,false]
. And all subsequent attempts of adding a new server will result in creation of a new map entry [Sever,true]
.
The map can grow beyond the given maxAddresses
limit, even there's only a single thread updating it sequentially.
Semantics
Values in the Map<Server, Boolean> servers
bear no real meaning.
Why should some registered servers be associated with true
, while others with false
?
That's an accidental complexity caused by the design decision to use a concurrent map. Which looks like an absolutely arbitrary decision.
Concurrency issues
If we blind on the things mentioned above and assume for a moment that we have a non-broken condition inside the lambda that ensures the limit is not violated (and also ignore absolutely arbitrary design decision of using a ConcurrentHashMap
), such as:
if (servers.size() < maxAddresses) {
// add new server
}
return null; // NO new entry created when remapping function returns null
it would not be enough. Two threads might execute it simultaneously and proceed with updating the map.
ConcurrentHashMap
will allow two threads to access different buckets of the map simultaneously without waiting for each other.
But the introduction of the ConcurrentHashMap
by itself does not solve the problem of limiting the size of the collection, since every update in any thread might result in reaching the specified limit.
Designing an ideal server-registration in a Vacuum
The problem statement is very contrived (as we already discussed in the comments). Maybe the interviewer wanted to test your communication skills (ability to explain and evaluate your decisions, find drawbacks in your code to iterate the design) and along the way you were expected to demonstrate some knowledge of concurrency.
But the exercise obviously has no aim to create something whole and useful. Keeping that in mind, can we make register()
faster than your version 1? Sure, we can...
DISCLAIMER: description below contains instruction on how to implement LoadBalancer.register()
optimized for multithreaded execution (which is the specific problem OP was tasked with). Keep in mind that just citing this solution blindly is unlikely to help you in an interview, because of the number of reasons outlined after this recipe.
RECIPE:
Introduce a
Semaphore
with the initial number of permits equal to the maximum number of servers.Use a concurrent hash set to store servers.
Acquire a permit before attempting to add a server (that will establish a happens-before relationship, and will allow to observe all changes on the
HashSet
done in a concurrent thread).After, acquiring a permit call
Set.add()
offering a new server, if you getfalse
as a result (the server is duplicate), release the permit. Ontrue
do nothing, we want the number of permits available at each moment to be equal to the number of server allowed to be added, hence with each new server it should decrease. No need in checking the size.At the very beginning there can be up to
maxAddresses
(not the most clear name by the way) threads operating concurrently.When the limit is reached, there will be zero permits available. And no thread will be allowed to call
register()
.
And here the interviewer can proceed torturing you saying: "That's nice, but the purpose of load-balancers is to distribute the incoming requests between the servers, which happens way more often than adding new servers, how about introducing method getServer()
?"
And now you're stuck in the stuck in a pickle, realizing that this solution for optimizing register()
in vacuum, without considering other class responsibilities, has no practical value. It doesn't matter how fast something is, if you can't leverage it.
Different concerns arise when you consider a piece of functionality in the context of other class responsibilities, rather than completely detached from everything.
If you introduce getServer()
it'll become apparent that you need to separate read and write access.
-
\$\begingroup\$ thanks for the answer! gist.github.com/Dimchikkk/c3cb66610e3a126ff03208fa8a38d791 does this version look close to what you have in mind for semaphore solution? \$\endgroup\$drets– drets2024年09月10日 08:58:03 +00:00Commented Sep 10, 2024 at 8:58
-
\$\begingroup\$ @drets Not exactly. If we're considering only a cartoon problem of write-efficient
register()
in a vacuum and our goal is only to feel the underlying collection to the bream, thenSemaphore.acquire()
will be enough. It'll establish happens-before relationship, ensuring that every thread will see modifications made by other threads. Intrinsic locking via asynchronized
block, as in the linked example, is unnecessary. With this additional synchronization, it's an equivalent of your version 1 in disguise. \$\endgroup\$Alexander Ivanchenko– Alexander Ivanchenko2024年09月10日 10:15:54 +00:00Commented Sep 10, 2024 at 10:15 -
\$\begingroup\$ Yeah, I want to understand how to write write-efficient concurrent code in java, so vacuum example if fine. I removed synchronized and the race condition test is started to fail from time to time, so synchronized block on servers is needed apparently and we are back to square one. \$\endgroup\$drets– drets2024年09月10日 10:23:12 +00:00Commented Sep 10, 2024 at 10:23
-
1\$\begingroup\$ @drets Well done and good catch regarding concurrent hash set. Here's why it's needed: with a non-concurrent data structure, a thread that has acquired a semaphore permit will see all the changes made prior to the latest release call (my bad, I forgot about that). Here's what
Semaphore
class documentation says: "Memory consistency effects: Actions in a thread prior to calling a "release" method such asrelease()
happen-before actions following a successful "acquire" method such asacquire()
in another thread.". \$\endgroup\$Alexander Ivanchenko– Alexander Ivanchenko2024年09月10日 11:42:15 +00:00Commented Sep 10, 2024 at 11:42 -
1\$\begingroup\$ @drets Hence, a concurrent hash set for propers synchronization, whilst guarding the limit of servers is insured through the number of permits. Regarding performance, if we assume that adding a server would be more time-consuming (which will be the case if we were not only updating in-memory collection, but also storing the data externally) this approach will definitelly be more performant than synchronizing the whole method. \$\endgroup\$Alexander Ivanchenko– Alexander Ivanchenko2024年09月10日 11:42:20 +00:00Commented Sep 10, 2024 at 11:42
Minor point: I have looked up HashSet<E>.add and it says:
public boolean add(E e)
Adds the specified element to this set if it is not already present. More formally, adds the specified element e to this set if this set contains no element e2 such that (e==null ? e2==null : e.equals(e2)). If this set already contains the element, the call leaves the set unchanged and returns false.
That would mean the check
if (servers.contains(server)) {
return false;
}
is already performed by return servers.add(server);
.
Explore related questions
See similar questions with these tags.
register
calls" - fine. But what are other requirement. What aboutunregester()
, or how are you going to pick a Server from a set to dial with a request (that's the Raison d'être of every load balancer)? \$\endgroup\$Map<Server, Boolean> servers
- what the boolean values are intended to represent? \$\endgroup\$