I am working on creating a scheduler which will connect to cassandra database and extract data from few tables and store it in few variables. Then from my main thread I will be using those variables by calling getters on them from CassUtil
class. Basically, I will cache the result in memory and then from a single background thread I will keep updating those cache which runs every 15 mins.
Here is my code which makes connection to cassandra cluster and then load stuff into these variables processMetadata
, procMetadata
and topicMetadata
. And then I call getters on these three variables from my main thread to get the data from it.
public class CassUtil {
private static final Logger LOGGER = Logger.getInstance(CassUtil.class);
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private List<ProcessMetadata> processMetadata = new ArrayList<>();
private List<ProcMetadata> procMetadata = new ArrayList<>();
private List<String> topicMetadata = new ArrayList<>();
private Session session;
private Cluster cluster;
private static class Holder {
private static final CassUtil INSTANCE = new CassUtil();
}
public static CassUtil getInstance() {
return Holder.INSTANCE;
}
private CassUtil() {
List<String> servers = TestUtils.HOSTNAMES;
String username =
TestUtils.loadCredentialFile().getProperty(TestUtils.USERNAME);
String password =
TestUtils.loadCredentialFile().getProperty(TestUtils.PASSWORD);
PoolingOptions opts = new PoolingOptions();
opts.setCoreConnectionsPerHost(HostDistance.LOCAL,
opts.getCoreConnectionsPerHost(HostDistance.LOCAL));
Builder builder = Cluster.builder();
cluster =
builder
.addContactPoints(servers.toArray(new String[servers.size()]))
.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
.withPoolingOptions(opts)
.withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy
.builder()
.withLocalDc(
!TestUtils.isProduction() ? "DC2" : TestUtils.getCurrentLocation()
.get().name().toLowerCase()).build())
.withCredentials(username, password).build();
try {
session = cluster.connect("testkeyspace");
StringBuilder sb = new StringBuilder();
Set<Host> allHosts = cluster.getMetadata().getAllHosts();
for (Host host : allHosts) {
sb.append("[");
sb.append(host.getDatacenter());
sb.append(host.getRack());
sb.append(host.getAddress());
sb.append("]");
}
LOGGER.logInfo("CONNECTED SUCCESSFULLY TO CASSANDRA CLUSTER: " + sb.toString());
} catch (NoHostAvailableException ex) {
LOGGER.logError("error= ", ExceptionUtils.getStackTrace(ex));
} catch (Exception ex) {
LOGGER.logError("error= " + ExceptionUtils.getStackTrace(ex));
}
}
// start a background thread which runs every 15 minutes
public void startScheduleTask() {
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
processMetadata = processMetadata(true);
topicMetadata = listOfTopic(TestUtils.GROUP_ID);
procMetadata = procMetadata();
} catch (Exception ex) {
LOGGER.logError("error= ", ExceptionUtils.getStackTrace(ex));
}
}
}, 0, 15, TimeUnit.MINUTES);
}
// called from main thread to initialize the metadata
// and start the background thread
public void initializeMetadata() {
processMetadata = processMetadata(true);
topicMetadata = listOfTopic(TestUtils.GROUP_ID);
procMetadata = procMetadata();
startScheduleTask();
}
public List<String> listOfTopic(final String consumerName) {
List<String> listOfTopics = new ArrayList<>();
String sql = "select topics from topic_metadata where id=1 and consumerName=?";
try {
// get data from cassandra
} catch (Exception ex) {
LOGGER.logError("error= ", ExceptionUtils.getStackTrace(ex), ", Consumer Name= ",
consumerName);
}
return listOfTopics;
}
public List<ProcessMetadata> processMetadata(final boolean flag) {
List<ProcessMetadata> metadatas = new ArrayList<>();
String sql = "select * from process_metadata where id=1 and is_active=?";
try {
// get data from cassandra
} catch (Exception ex) {
LOGGER.logError("error= ", ExceptionUtils.getStackTrace(ex), ", active= ", flag);
}
return metadatas;
}
public List<ProcMetadata> procMetadata() {
List<ProcMetadata> metadatas = new ArrayList<>();
String sql = "select * from schema where id=1";
try {
// get data from cassandra
} catch (SchemaParseException ex) {
LOGGER.logError("schema parsing error= ", ExceptionUtils.getStackTrace(ex));
} catch (Exception ex) {
LOGGER.logError("error= ", ExceptionUtils.getStackTrace(ex));
}
return metadatas;
}
public void shutdown() {
LOGGER.logInfo("Shutting down the whole cassandra cluster");
if (null != session) {
session.close();
}
if (null != cluster) {
cluster.close();
}
}
public Session getSession() {
if (session == null) {
throw new IllegalStateException("No connection initialized");
}
return session;
}
public Cluster getCluster() {
return cluster;
}
public List<ProcessMetadata> getProcessMetadata() {
return processMetadata;
}
public List<String> getTopicMetadata() {
return topicMetadata;
}
public List<ProcMetadata> getProcMetadata() {
return procMetadata;
}
}
And here is my Initializer code which calls initializeMetadata()
method to initialize stuff. I am using Spring here.
@Singleton
@DependencyInjectionInitializer
public class TestInitializer {
public TestInitializer() {
LOGGER.logInfo("Initializer called.");
CassUtil.getInstance().initializeMetadata();
}
@PostConstruct
public void postInit() {
LOGGER.logInfo("PostInit called");
// doing some stuff
// accessing those three variables by calling getter on them from CassUtil class
}
@PreDestroy
public void shutdown() {
LOGGER.logInfo("Shutdown called");
// doing some stuff
}
}
I wanted to see if my CassUtil
class can be improved in any way. My main idea is to access processMetadata
, procMetadata
and topicMetadata
variables from main thread without calling cassandra every time and instead it should load data from cache which is getting updated every 15 mins. So, I need to have a background thread which runs every 15 minutes and extract data from cassandra tables and then populate these variables and then I can use these variables from main thread.
Is there any better way? I am using Java 7.
1 Answer 1
Sugestions for your solution:
I have few sugestions how I would improve your solution. BTW it's really good job what you have done.
1, externalize sql queries
String sql = "select * from schema where id=1";
In my opinion in the code there should be as least as possible hardcoded strings. When you will need to change table name from schema
to new_schema
you will need to recompile all the project - it's OK for one or two times but what about 20times? I would pull my hair out :).
2, Method shutdown
refactoring
I would rather vote for:
if (null != session || null != cluster) {
session.close();
}
3, Constructor CassUtil
has 40+ lines - I think you should break it to few methods. At least cluster
is candidate for private method.
4, rename method to follow JavaBeans syntax definition
listOfTopic -> getTopics
processMetadata -> getProcessMetadata
...
Tip #1:
Since you are using spring I would think about making CassUtil
as a spring managed bean. But if you are using CassUtil in non-spring components you don't have to read following:
@Configuration
public class AppConfig {
@Bean
public CassUtil() {
// you are able to delete Holder and make `CassUtil` public construcotr
// by default @Bean is in scope singleton
return new CassUtil();
}
}
With that you are able to inject (@Autowired private CassUtil cassandraUtil
) wherever you want (in spring component of course).
I would create new spring @Component
like: CassandraProcessor
and put here all service
methods like getters
and also scheduled method
(dont forget to @EnableScheduling
):
@Scheduled(fixedRate=900000)
public void startScheduleTask() {
processMetadata = processMetadata(true);
topicMetadata = listOfTopic(TestUtils.GROUP_ID);
procMetadata = procMetadata();
}
This should be executed on the startup of spring application so I suppose you don't need initializeMetadata
at all.
After implementing this approach when you will need to get listOfTopic
you just @Autowired private CassandraProcessor cassandraProcessor
and call cassandraProcessor.listOfTopic()
.
Tip #2:
The next option is to use ehcache
or similar cache manager with cacheloader and you wouldn't have to manage scheduling etc.
CassUtil
is not spring component? If no, why? Do you have requirement whyCassUtil
should not be spring component? \$\endgroup\$