I have two consumers and each consumer has their own validation logic entirely different from one other so I am using two validator class for that, one for each other.
ConsumerA
, for this consumer, I am usingValidatorA
.ConsumerA
, for this consumer, I am usingValidatorB
.
ConsumerA class
public class ConsumerA extends Consumer {
.....
@Override
protected void run(String consumerName, Properties consumerProps) {
consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(getTopicsBasisOnConsumerName());
try {
while (!closed.get()) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(2000);
for (ConsumerRecord<byte[], byte[]> record : records) {
GenericRecord genericRecord = decoder.decode(record.value());
ValidatorA validation = new ValidatorA(consumerName, genericRecord);
if (!validation.isValid()) {
logger.logError("dropping records. payload= ", genericRecord);
continue;
}
// setting variables to DataPacket, do I need this class at all?
DataPacket packet = new DataPacket.Builder(genericRecord).setClientId(validation.getClientId())
.setDeviceId(validation.getDeviceId()).setPayId(validation.getPayId()).setHolder(validation.getHolder())
.setOldTimestamp(validation.getOldTimestamp()).setNewTimestamp(validation.getNewTimestamp()).build();
Processor.getInstance().execute(packet);
}
}
} catch (Exception ex) {
System.out.println("error= " + ex);
}
}
}
ConsumerB class
public class ConsumerB extends Consumer {
.....
@Override
protected void run(String consumerName, Properties consumerProps) {
consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(getTopicsBasisOnConsumerName());
try {
while (!closed.get()) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(2000);
for (ConsumerRecord<byte[], byte[]> record : records) {
GenericRecord genericRecord = decoder.decode(record.value());
ValidatorB validation = new ValidatorB(consumerName, genericRecord);
if (!validation.isValid()) {
logger.logError("dropping records. payload= ", genericRecord);
continue;
}
// setting variables to DataPacket, do I need this class at all?
DataPacket packet = new DataPacket.Builder(genericRecord).setType(validation.getType())
.setDatumId(validation.getDatumId()).setItemId(validation.getItemId()).setOldTimestamp(validation.getOldTimestamp())
.setNewTimestamp(validation.getNewTimestamp()).build();
Processor.getInstance().execute(packet);
}
}
} catch (Exception ex) {
System.out.println("error= " + ex);
}
}
}
Below are my corresponding validator classes:
ValidatorA class This class is called by ConsumerA
, basically it's isValid
method for validation.
public class ValidatorA extends Validator {
private static final Logger logger = Logger.getInstance(ValidatorA.class);
private final String consumerName;
private final GenericRecord genericRecord;
private final Long oldTimestamp;
private final Long newTimestamp;
private final String clientId;
private final String deviceId;
private final Integer payId;
private final Map<String, String> holder;
public ValidatorA(String consumerName, GenericRecord genericRecord) {
this.consumerName = consumerName;
this.genericRecord = genericRecord
this.oldTimestamp = (Long) DataUtils.parse(genericRecord, "oldTimestamp");
this.newTimestamp = (Long) DataUtils.parse(genericRecord, "newTimestamp");
this.clientId = (String) DataUtils.parse(genericRecord, "clientId");
this.deviceId = (String) DataUtils.parse(genericRecord, "deviceId");
this.payId = (Integer) DataUtils.parse(genericRecord, "payId");
this.holder = (Map<String, String>) DataUtils.parse(genericRecord, "holder");
}
@Override
public boolean isValid() {
return isValidClientIdDeviceId() && isValidPayId() && isValidHolder();
}
private boolean isValidHolder() {
if (MapUtils.isEmpty(holder)) {
logger.logError("invalid holder.");
return false;
}
return true;
}
private boolean isValidPayId() {
if (payId == null) {
logger.logError("invalid payId.");
return false;
}
return true;
}
private boolean isValidClientIdDeviceId() {
if (Strings.isNullOrEmpty(clientId) && Strings.isNullOrEmpty(deviceId)) {
logger.logError("invalid clientId and deviceId.");
return false;
}
return true;
}
// getter and toString method here
}
ValidatorB class This class is called by ConsumerB
, basically it's isValid
method for validation.
public class ValidatorB extends Validator {
private static final Logger logger = Logger.getInstance(ValidatorB.class);
private final String consumerName;
private final GenericRecord genericRecord;
private final Long oldTimestamp;
private final Long newTimestamp;
private final String type;
private final String datumId;
private final String itemId;
public ValidatorB(String consumerName, GenericRecord genericRecord) {
this.consumerName = consumerName;
this.genericRecord = genericRecord
this.oldTimestamp = (Long) DataUtils.parse(genericRecord, "oldTimestamp");
this.newTimestamp = (Long) DataUtils.parse(genericRecord, "newTimestamp");
this.type = (String) DataUtils.parse(genericRecord, "type");
this.datumId = (String) DataUtils.parse(genericRecord, "datumId");
this.itemId = (Integer) DataUtils.parse(genericRecord, "itemId");
}
@Override
public boolean isValid() {
return isValidType() && isValidDatumId() && isValidItemId();
}
private boolean isValidType() {
if (Strings.isNullOrEmpty(type)) {
logger.logError("invalid type is coming.");
return false;
}
return true;
}
private boolean isValidDatumId() {
if (Strings.isNullOrEmpty(datumId)) {
logger.logError("invalid datumId is coming.");
return false;
}
return true;
}
private boolean isValidItemId() {
if (Strings.isNullOrEmpty(itemId)) {
logger.logError("invalid itemId is coming.");
return false;
}
return true;
}
// getter and toString method here
}
Validator class
public abstract class Validator {
public abstract boolean isValid();
}
Now in my Processor
class in the execute
method, I am using DataPacket
passed from both the consumer to extract all these variables which have been set, do some processing by using those variables. And later on we will get some new variables which I am setting it by cloning the old packet
builder.
private void execute(DataPacket packet) {
// extract all the fields from this packet
// do some processing using them
// now clone the packet and set processed variables.
String type = packet.getType();
String clientId = packet.getClientId();
// do some processing
String schemaId = ....;
Schema schema = ....;
// cloning old packet and setting new variables
DataPacket clonedPacket = new DataPacket.Builder(packet).setSchemaId(schemaId).setSchema(schema).build();
// send clonedPacket to some other system which will send to database
}
My question is - Do I need this DataPacket
at all when I already have ValidatorA
, ValidatorB
class? DataPacket
class just contains all the variables combine from both the validator class and I am using DataPacket
builder class just to set data in both the consumers and then use it in execute method of Processor class
, and then clone the old builder to make a new builder object again by setting some new variables.
Is there any better way to do this thing? Looks like I can get rid of DataPacket
but not sure how.
-
\$\begingroup\$ This question looks a bit foobarish. Do "A" and "B" mean something, and if so, what? \$\endgroup\$200_success– 200_success2016年12月21日 06:35:15 +00:00Commented Dec 21, 2016 at 6:35
-
\$\begingroup\$ They are kafka consumers. I mentioned in the question they are consumers but forgot to mention kafka consumers. Will add that. \$\endgroup\$david– david2016年12月21日 06:36:22 +00:00Commented Dec 21, 2016 at 6:36
-
\$\begingroup\$ "A" and "B" still sound like rather poor names, though, for whatever you are trying to do. \$\endgroup\$200_success– 200_success2016年12月21日 06:38:01 +00:00Commented Dec 21, 2016 at 6:38
1 Answer 1
The question is rather vague, I'll assume you haven't taken the time to step back and look at your overall architecture.
Still, a lot can be said already, so here's my understanding (you should explain this in your Question, a code dump isnt going to provide much answers):
ConsumerA
andConsumerB
run constantly, and in parallel- They produce some
DataPacket
- Each
DataPacket
is validated (ConsumerA
usesValidatorA
etc.) - Each
DataPacket
is sent to a singletonProcessor
class 'sprocess
method (Processor class is not provided but process method is) - The
process()
method clones theDataPacket
and modifies it. - The cloned
DataPacket
is sent to 'some other system'
You're bothered because ValidatorX
class and DataPacket
class share some internal representations (to which extent, that not obvious because you did not provide DataPacket
class).
Read your own code carefully, it tells you what's wrong
ValidatorB validation = new ValidatorB(consumerName, genericRecord);
You define two Validator
classes. However, you name their instances 'validation
'. That is because the Validator
class is wrongly named and you felt it when you had to use it: it does not validate some input you may throw at it, it validates one data when it is built (which is already frowned upon), then gives you the result upon request (the real job is in the DataUtils.parse
calls in the Constructor).
I expect a Validator
to be stateless (or lightly prameterized if it helps reuse) and be used like this:
public class ConsumerA {
private static final ValidatorA = new Validator();
(...)
protected void run(String consumerName, Properties consumerProps) {
for(...)
(...)
if(validator.isValid(consumerName, genericRecord)){
// Do something
}
}
}
}
This way the instance is reused, no useless internal state is kept, etc. That already solves what was bothering you (Validator and DataPAcket sharing internal state), plus it saves a bunch of object creation/destroy.
Since your ValidatorX
classes will still need to pass around the data in a coherent manner internally, you could even make such a Validator
work on a DataPacket
internally, and return null
when validation fails or said DataPacket
when validation passes. But since I don't have DataPacket class, and I don't know if you're allowed to modify it if need be, I can't be adamant here.
Consumers share duplicate code.
You could regroup them in an abstract class and two sub-classes with just the differing bits.
Or (much better) use Strategy pattern on a single GenericRecordConsumer
, which you build by providing it with a specific Validator
. Note, you can only do that if you implemented a reusable Validator
class and not a disposable Validation
class (as mentioned above).
public class GenericRecordConsumer extends Consumer {
public GenericRecordConsumer(Validator validator){
this.validator = validator;
}
.....
@Override
protected void run(String consumerName, Properties consumerProps) {
consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(getTopicsBasisOnConsumerName());
try {
while (!closed.get()) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(2000);
for (ConsumerRecord<byte[], byte[]> record : records) {
GenericRecord genericRecord = decoder.decode(record.value());
if (!this.validator.isValid(consumerName, genericRecord)) {
logger.logError("dropping records. payload= ", genericRecord);
continue;
}
// setting variables to DataPacket, do I need this class at all?
DataPacket packet = new DataPacket.Builder(genericRecord).setClientId(validation.getClientId())
.setDeviceId(validation.getDeviceId()).setPayId(validation.getPayId()).setHolder(validation.getHolder())
.setOldTimestamp(validation.getOldTimestamp()).setNewTimestamp(validation.getNewTimestamp()).build();
Processor.getInstance().execute(packet);
}
}
} catch (Exception ex) {
System.out.println("error= " + ex);
}
}
}
Then you can create them as easily as :
Consumer consumerA = new GenericRecordConsumer (new ValidatorA("consumerA"));
Consumer consumerB = new GenericRecordConsumer (new ValidatorB("consumerB"));
Much shorter, more extendable, zero code duplication. If you have other behaviour that change between the consumer versions (in your ellusive (...)
), that code can have the same fate and be Strategy-fied.
Exception handling
Yup, don't do this:
try {
while (!closed.get()) {
(...)
}
} catch (Exception ex) {
System.out.println("error= " + ex);
}
First, System.out
is a very bad exception handling mechanism (use a Logger).
Second, a consumer system running through a list of jobs continuously should have two sorts of exceptions:
- Recoverable exceptions, which are caught, logged properly, and probably also stored in DB for the admins, etc. You may even do some processing to salvage the operation that failed. Ultimately, they do not interrupt the Consumer work loop.
- Unrecoverable exceptions, which are of two kinds:
- Unrecoverable exceptions of the Consumer (
ConnectionToMyDatabaseIsLostException
etc.), which are caught, logged, and trigger a special warning (email the admins, store in DB). Ultimately, they should interrupt the Consumer because its function is compromised somehow. Potentially, the rest of the system can still funtion, albeit without this particular consumer. You might have an admin panel with a 'revive Consumer' button for instance. - Unrecoverable exceptions which you know nothing about (
OutOfMemoryException
etc.). You can't tell why or when they appear, and you can do nothing about it. These should not be caught.
- Unrecoverable exceptions of the Consumer (
The for loop could look like:
try {
while (!closed.get()) {
try{
(...)
} catch(HarmlessException ex){
log.WARN("blah", ex);
continue; // Pursue the job
}
}
} catch (HarmfulExceptionThatKillsTheConsumer ex) {
System.out.println("error= " + ex);
OtherSystem.signalConsumerFailure(this);
return; // Exit the Consumer work early
}
// Note I did not catch any other exception, which I wouldn't know what to do with
(of course I would actually extract the wole internal try/catch in its own method with a throws
to clarify)
Data validation
This makes me cringe a bit:
this.oldTimestamp = (Long) DataUtils.parse(genericRecord, "oldTimestamp");
this.newTimestamp = (Long) DataUtils.parse(genericRecord, "newTimestamp");
this.type = (String) DataUtils.parse(genericRecord, "type");
this.datumId = (String) DataUtils.parse(genericRecord, "datumId");
this.itemId = (Integer) DataUtils.parse(genericRecord, "itemId");
You have a single method, that can (apparently) return Objects of several classes. It's probably some kind of reflection applied on a bean-like Object. I'm not against reflection, it is elegant. But it shouldn't be an excuse to create a caryall method that forgets its general purpose.
Since this method seem to return the common types (Long
, Integer
, String
) and not some esotheric custom objects list that would have to be extended as you go, it would be much more robust and elegant to have some parseInt
, parseString
and parseLong
methods. This allows three things:
- It ditches that cast (I hate casting. If there's a cast, there's a better way!)
- It allows the parse method to be prepared for its expected type, and maybe do some more work on the input (Is integer positive? Is String empty? etc.)
- It prevents some stupid mistakes
like this.datumId = (Long) DataUtils.parse(genericRecord, "itemId");
where an Int is in fact (successfully) cast to Long and all hells come loose because it doesn't mean a thing business-wise.
Ties to external systems
Your comment :
// send clonedPacket to some other system which will send to database
Is rather vague. How is this sent? Is the DataPacket
wholly sent, or just some fields? If this is not imposed by an external API, it might be improved but I can't tell.
Overall
Comments are good, sufficient and to the point
Variable naming is good and case is correct.
Log messages are a bit clumsy:
logger.logError("invalid itemId is coming.");
Could be replaced by
logger.logError("Invalid ItemId provided. ItemId must not be null nor empty. " + this.consumerName + " cannot validate this record.");