To illustrate how to do leader election with Apache Zookeeper I've created an straightforward application and I would like to have to threading part of the application reviewed.
I've created a simple class (Speaker
) which outputs an unique identifier to a file on a regular interval. I've implemented the Runnable
interface and start it with a ScheduledExecutorService
with a fixed delay like this:
scheduler.scheduleWithFixedDelay(speaker, 0, delay, TimeUnit.MILLISECONDS);
The Speaker
class also implements a Listener
interface so it can register itself to a Monitor
(running in a separate thread, communicating to the Zookeeper server) which can enable or disable the output to the file.
My review question (besides general code review): Is this the best way to start and stop functionality in a scheduled thread?
Speaker & Speaker Server:
Complete project: https://github.com/cyberroadie/zookeeper-leader
Code explanation: http://cyberroadie.wordpress.com/2011/11/24/implementing-leader-election-with-zookeeper/
public class Speaker implements Runnable, NodeMonitor.NodeMonitorListener {
private String message;
private String processName;
private long counter = 0;
private volatile boolean canSpeak = false;
public Speaker(String message) throws IOException, InterruptedException, KeeperException {
this.message = message;
this.processName = getUniqueIdentifier();
}
private static String getUniqueIdentifier() {
String processName = ManagementFactory.getRuntimeMXBean().getName();
String processId = processName.substring(0, processName.indexOf("@"));
return "pid-" + processId + ".";
}
public void run() {
try {
if (canSpeak) {
handleTask();
}
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void handleTask() throws IOException {
FileWriter fstream = new FileWriter("out.txt");
BufferedWriter out = new BufferedWriter(fstream);
out.write(message + ": " + counter++ + " " + processName + "\n");
out.close();
}
@Override
public void startSpeaking() {
this.canSpeak = true;
}
@Override
public void stopSpeaking() {
this.canSpeak = false;
}
@Override
public String getProcessName() {
return processName;
}
}
Speaker Server:
public class SpeakerServer {
final static Logger logger = LoggerFactory.getLogger(SpeakerServer.class);
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private static NodeMonitor monitor;
private static void printUsage() {
System.out.println("program [message] [wait between messages in millisecond]");
}
public static void main(String[] args) {
if (args.length < 2) {
printUsage();
System.exit(1);
}
long delay = Long.parseLong(args[1]);
Speaker speaker = null;
try {
speaker = new Speaker(args[0]);
monitor = new NodeMonitor();
monitor.setListener(speaker);
monitor.start();
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
scheduler.scheduleWithFixedDelay(speaker, 0, delay, TimeUnit.MILLISECONDS);
logger.info("Speaker server started with fixed time delay of " + delay + " milliseconds.");
}
}
2 Answers 2
you could also look at using a fixed thread pool. That way, if one thread becomes blocked for some reason, say the disk became full, the threads would wait in a queue until the blocked thread terminated. That way you wouldn't loose any unique id's or run into memory problems if too many threads became blocked.
Hope that helps,
Ross
I'd consider stopping the scheduler (or canceling the task) on a stopSpeaking
call and restarting it on a startSpeaking
call (or resubmitting the Runnable
). I'm not familiar with Zookeeper, I don't know anything about the typical usage, so if stop/startSpeaking calls arrives often it won't be the best solution but it seems cleaner if there are long breaks between stopSpeaking
and startSpeaking
calls.
A few notes about the code:
Consider using SLF4J to log the exceptions too instead of
printStackTrace()
.Close the
BufferedWriter
in afinally
block:final BufferedWriter out = new BufferedWriter(fstream); try { ... } finally { out.close(); }
System.exit
isn't a nicest way to stop an entire application, especially hidden in acatch
block of arun
method. I'd rethrow theIOException
as aRuntimeException
(or aRuntimeException
subclass):@Override public void run() { if (!canSpeak) { return; } try { handleTask(); } catch (final IOException e) { throw new RuntimeException(e); } }
then use the
ScheduledFuture
ofscheduleWithFixedDelay
to wait for theRuntimeException
:final ScheduledFuture<?> speakerFuture = scheduler.scheduleWithFixedDelay(speaker, 0, delay, TimeUnit.MILLISECONDS); logger.info("Speaker server started with fixed time delay of " + delay + " milliseconds."); try { speakerFuture.get(); } finally { scheduler.shutdown(); }
Disclaimer: I don't like the usage of
RuntimeException
here, but unfortunately thescheduleAtFixedRate
accepts onlyRunnable
objects (notCallable
s which could throw anyException
).Please notice the guard clause also.
I'm not completely sure that
ScheduledExecutorService
guarantees whether scheduled task will always run in the same thread or not. (I suppose it does not.) Furthermore, thecounter
field in theSpeaker
class could be synchronized properly due to volatile piggybacking but I'd use a defensive strategy here and useAtomicInteger
for the counter to make it less error-prone.I'd check
null
s in the constructor ofSpeaker
. checkNotNull from Guava is a great choice for that.this.message = checkNotNull(message, "message cannot be null");
(Also see: Effective Java, 2nd edition, Item 38: Check parameters for validity)
SLF4J supports
{}
, use that:logger.info("Speaker server started with fixed time delay of {} milliseconds.", delay);