I am developing an event-sourced Electric Vehicle Charging Station Management System, which is connected to several Charging Stations. In this domain, I've come up with an aggregate for the Charging Station, which includes the internal state of the Charging Station(whether it is network-connected, if a car is charging using the station's connectors).
The station notifies me about its state through messages defined in a standardized protocol:
- Heartbeat: whether the station is still "alive"
- StatusNotification: whether the station has encountered an error(under voltage), or if everything is correct
And my server can send commands to this station:
- RemoteStartTransaction: tells the station to unlock and reserve one of its connectors, for a car to charge using the connector.
I've developed an Aggregate for this Charging Station. It contains the internal entities of its connector, whether it's charging or not, if it has a problem in the power system, ...
And the Aggregate, which its memory representation resides in the server that I control, not in the Charging Station itself, has a StationClient
service, which is responsible for sending these commands to the physical Charging Station(pseudocode):
class StationAggregate {
stationClient: StationClient
URL: string
connector: Connector[]
unlock(connectorId) {
if this.connectors.find(connectorId).isAvailableToBeUnlocked() {
return ErrorConnectorNotAvailable
}
error = this.stationClient.sendRemoteStartTransaction(this.URL, connectorId)
if error {
return ErrorStationRejectedUnlock
}
this.applyEvents([
StationUnlockedEvent(connectorId, now())
])
return Ok
}
receiveHeartbeat(timestamp) {
this.applyEvents([
StationSentHeartbeat(timestamp)
])
return Ok
}
}
I am using a optimistic concurrency, which means that, I load the Aggregate from a list of events, and I store the current version of the Aggregate in its memory representation: StationAggregate in version #2032, when a command is successfully processed and event(s) applied, it would the in version #2033, for example. In that way, I can put a unique constraint on the (StationID, Version) tuple on my persistence layer, and guarantee that only one event is persisted.
If by any chance, occurs a receival of a Heartbeat message, and the receival of a Unlock command. In both threads, they would load the StationAggregate and would be both in version X, in the case of the Heartbeat receival, there would be no side-effects, but in the case of the Unlock command, there would be a side-effect that tells the physical Charging Station to be unlocked. However as I'm using optimistic concurrency, that StationUnlocked
event could be rejected from the persistence layer. I don't know how I could handle that, as I can't retry the command, because it its inherently not idempotent(as the physical Station would reject the second request)
I don't know if I'm modelling something wrong, or if it's really a hard domain to model.
1 Answer 1
I load the Aggregate from a list of events, and I store the current version of the Aggregate in its memory representation
Rather than storing the current, versioned, memory representation of StationAggregate
on every event, you should store the events themselves in an append-only repository and only store the current state of StationAggregate
every X time or Y events.
When loading a StationAggregate
, you would first load the last saved snapshot of the state and then the events that occurred after that snapshot was taken. Then you replay those events, where you only make the corresponding internal state changes but not any calls to outside systems.
When you now have two events hat occur at the same time, they both can be added to the datastore without any problems (as they are just appended in whatever order) and the next time the StationAggregate
gets loaded, both events are reflected in the internal state.
With regard to how to deal with the Heartbeat events, that depends on where the StationAggregate
instance lives most of the time.
If the StationAggregate
exists mostly in RAM and only needs to be reloaded from the database when the system starts up (initial boot, reboot or crash), then I wouldn't even bother to persist the connection state. It is far easier to start with the assumption that the Station is offline and wait for (or request if the protocol supports it) the first Heartbeat message. The only reason for persisting connection status events would be for later reporting/analysis and that is best handled with CameOnline
/WentOffline
events.
If the StationAggregate
exists mostly in the database and gets loaded for every request/event, then it depends on how the timers work in your system if you need to persist the Heartbeat events or just CameOnline
/WentOffline
events.
If your timers can be set to fire an event after a configurable time and they can be restarted before the timeout elapses, then the CameOnline
/WentOffline
events would be sufficient.
If your timers can't be restarted, then you would need to store the Heartbeat events to be able to periodically check how long ago the last one was received in order to detect that a Station went offline.
Explore related questions
See similar questions with these tags.
StationUnlocked
events would definitely conflict, but in this case aStationSentHeartbeat
andStationUnlocked
event would not conflictStationCameOnline
andStationWentOffline
. You will need the latter anyway to notice in the replay when there is a large enough gap in the hearbeats. And a true conflict would only arise when different orders of the events give a different end-result, which I only see if the Station allows fewer Transactions than it has Connectors.