I am building a Spring boot REST API app that is part of a microservice architecture project. What I am planning:
- My app listens for events from two other services and after some business logic, logging, calculating, and persistence I have to publish my results to another rabbit MQ exchange.
- The first two services listen to pings every 10 seconds from 15,000 devices each and publish their data to queues that I listen to.
My current plan:
I need some sort of cache to keep track of events sent from both services because I need data from both at a time interval to do my logic. I am using an in-memory hashtable and I am sorting the values in this hashtable everytime I update it.
My hashmap is a
Map<String, Map<String, Instant>>
where the first key is a regionID and the second map is keyed based on deviceID. This sorting is done on the internal map and I suspect might become the bottleneck.
Is a single shared key value datastructure storage a good way to go with this or is there something else I should try?
-
4Whenever performance is involved, you need to implement, measure, and work on your approach if results are not good enough. Nobody here knows what your app will actually do and what the real requirements regarding event correlation would be.Hans-Martin Mosner– Hans-Martin Mosner07/09/2024 10:40:15Commented Jul 9, 2024 at 10:40
-
see Green fields, blue skies, and the white board - what is too broad?gnat– gnat07/09/2024 13:14:14Commented Jul 9, 2024 at 13:14
-
I understand that I should implement and then see if that works, I just wanted advise on whether a single shared storage where millions of reads and writes are happening daily is something that is done in the industry.dk tammy– dk tammy07/09/2024 13:52:56Commented Jul 9, 2024 at 13:52
-
1You'd be surprised you many different things (including very stupid ones) are done daily in the industry...Hans-Martin Mosner– Hans-Martin Mosner07/10/2024 06:14:55Commented Jul 10, 2024 at 6:14
-
2@dktammy That's sounding dangerously close to "cargo cult programming" - i.e. trying to choose solutions based on whether it happens to work for other people. Keep in mind that other people will be working in a completely different context to yours, and with different requirements. You have nothing to lose by throwing a quick prototype together and trying it out; either it works or it doesn't; random strangers can't know any better than you whether it'll work or not for your particular situation.Ben Cottrell– Ben Cottrell07/10/2024 18:53:01Commented Jul 10, 2024 at 18:53
3 Answers 3
An important thing to consider is what kind of hardware you can use, and whether additional hardware or additional development effort is a cheaper way to make it faster (or fast enough).
The reason why you implement and measure first is that you have little idea what affects performance how much. You might make something ten times faster and figure out that the amount of time went from 0.1 to 0.01 out of ten seconds. While reducing 5 seconds by 20% to 4 seconds is much more effective.
The most important factor I suppose based on your question will be how many real records are relevant.
In a day you get around: 129.600.000 pings. (15.000*6*60*24)
But are the relevant pings really your number or is the 15.000 your number? For example if the pings are from servers to measure uptime, you would have a list of 15.000 servers.
Region ID - Device ID - Last seen dateTime
Every ping you check update the lastSeen for example. Is that kind of the process you need?
Managing an active queue of 129M is a totally different game compared to 15K.
If you need to manage the 129M a big data solution might help you as shaping timed records into data series is a quite well known problem. The question is where you need to have custom solutions and where you can use proven ones.
Note that the example code is available on GitHub
Here is how I would do it:
This approach has several advantages:
- The Sources and the Consumers (Sinks) are decoupled.
- Instead of an in-memory cache, you have a message queue - you will not lose any ping metrics and it scales much better.
- This scales: You can add an arbitrary number of consumers to the message queue with durable subscriptions under the same name. This would mean that the messages would be fanned out. On kubernetes, you could even tie that to a Horizontal Pod Autoscaler and would not even have to mind about scaling. You can also scale the message queue, basically ad infinitum.
- Each component becomes dead simple. I have implemented this with Telegraf as the ping source, NATS as the message queue, a simple Go program as the Consumers and CloudEvents as the data transfer format.
The Idea
Ping source
As written, I have implemented this via telegraf and a dead simple config:
[[outputs.nats]]
servers = ["nats://nats:4222"]
name = "telegraf"
subject = "telegraf"
data_format = "cloudevents"
cloudevents_version = "1.0"
cloudevents_source = "telegraf"
cloudevents_event_type = "com.github.mwmahlberg.se-monitoringevents-454044.telegraf"
cloudevents_event_time = "creation"
[outputs.nats.jetstream]
name = "ping-results"
[[inputs.ping]]
urls = [${PING_TARGETS}]
method = "native"
All you have to do is to add the URLs you want to ping, and you can even do this via an environment variable, as shown.
Message Queue
The message queue serves two purposes here: it replaces the in-memory cache, which may lose data easily. Also, it decouples the Sources and Sinks for the pings, making either or both easily replaceable. Also, with choosing a pubsub pattern, you can have multiple consumer types subscribing to ping events.
I have chosen NATS here for several reasons:
- It is comparatively lightweight for a message queue
- It scales well
- The client is dead simple to use.
Consumers
Now all your consumers have to do is to subscribe to the NATS "cluster" (in the demo, it is a single server), all under the same durable subscriber name, and process the message. If you make consumers durable, they will also be able to process pings that were executed while they were offline.
If you have multiple steps, you can repeat using the pubsub pattern: Simply send the processed event to a different topic on the message queue, have 1..n subscribers and process those messages.
The implementation
I have setup a little demo.
docker-compose.yaml
configs:
telegraf:
content: |
[global_tags]
env = "demo"
[agent]
interval = "10s"
round_interval = true
metric_batch_size = 1000
metric_buffer_limit = 10000
collection_jitter = "0s"
flush_interval = "10s"
flush_jitter = "0s"
precision = "0s"
[[outputs.health]]
[[outputs.nats]]
servers = ["nats://nats:4222"]
name = "telegraf"
subject = "telegraf"
data_format = "cloudevents"
cloudevents_version = "1.0"
cloudevents_source = "telegraf"
cloudevents_event_type = "com.github.mwmahlberg.se-monitoringevents-454044.telegraf"
cloudevents_event_time = "creation"
[outputs.nats.jetstream]
name = "ping-results"
[[inputs.ping]]
# The double dollar sign is required to escape the variable in docker-compose
# and to pass it to the telegraf container verbatim.
urls = [$${PING_TARGETS}]
method = "native"
volumes:
nats:
driver: local
services:
nats:
image: nats:2-alpine
command: "-m 8222 -n mq --js -sd /data"
volumes:
- type: volume
source: nats
target: /data
ports:
- 4222:4222
- 8222:8222
healthcheck:
test: [ "CMD", "wget", "http://localhost:8222/healthz", "-q", "-S", "-O", "-" ]
interval: 10s
timeout: 1s
retries: 5
start_period: 30s
telegraf:
image: telegraf:1.34-alpine
environment:
- HOSTNAME=telegraf
- PING_TARGETS="telegraf","nats"
restart: always
healthcheck:
test: [ "CMD", "wget", "http://localhost:8080/healthz", "-q", "-S", "-O", "-" ]
interval: 10s
timeout: 1s
retries: 5
start_period: 20s
depends_on:
nats:
condition: service_healthy
cap_add:
# Required in podman
- NET_RAW
configs:
- source: telegraf
target: /etc/telegraf/telegraf.conf
processor:
image: mwmahlberg/se-monitoringevents-454044-processor:latest
depends_on:
nats:
condition: service_healthy
telegraf:
condition: service_healthy
deploy:
mode: replicated
replicas: 2
environment:
- PROCESSOR_NATS_HOST=nats:4222
- PROCESSOR_NATS_STREAM_NAME=ping-results
- PROCESSOR_LOG_LEVEL=debug
Absolutely nothing spectacular here. The telegraf is set up as the ping source, NATS as the message queue and a little Go application as the consumer.
Note that this docker-compose.yaml can actually be run. The image for the processor is available on DockerHub
However, and this is part of the "trick": There are two instances of the consumer, and it can be scaled up to many.
The processor
It is a simple, one-file go program:
package main
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"os"
"os/signal"
"syscall"
"time"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/kelseyhightower/envconfig"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
// loglevel is just a wrapper around slog.Level to implement the UnmarshalText method
// for parsing log levels from environment variables.
type loglevel slog.Level
// UnmarshalText implements the encoding.TextUnmarshaler interface for loglevel.
func (l *loglevel) UnmarshalText(text []byte) error {
switch string(text) {
case "debug":
*l = loglevel(slog.LevelDebug)
case "info":
*l = loglevel(slog.LevelInfo)
case "warn":
*l = loglevel(slog.LevelWarn)
case "error":
*l = loglevel(slog.LevelError)
default:
return fmt.Errorf("invalid log level: %s", text)
}
return nil
}
// timestamp is a wrapper around time.Time to implement the UnmarshalJSON method
// for parsing timestamps from JSON data.
// It converts the timestamp from nanoseconds since epoch to time.Time.
type timestamp time.Time
// UnmarshalJSON implements the json.Unmarshaler interface for timestamp.
func (t *timestamp) UnmarshalJSON(data []byte) error {
var ts int64
if err := json.Unmarshal(data, &ts); err != nil {
return err
}
*t = timestamp(time.Unix(0, ts))
return nil
}
// config holds the configuration for the processor service.
// It uses the envconfig package to load configuration from environment variables.
type config struct {
LogLevel loglevel `split_words:"true" default:"info" desc:"Log level (debug, info, warn, error)"`
Nats struct {
Host string `split_words:"true" required:"true" default:"localhost:4222" desc:"NATS server host and port"`
Client struct {
Name string `split_words:"true" default:"processor" desc:"NATS client name"`
}
Stream struct {
ClientName string `split_words:"true" default:"processor" desc:"NATS JetStream client name"`
Name string `split_words:"true" default:"ping-results" desc:"NATS JetStream stream name"`
}
}
}
// pingresult represents the structure of the ping result message.
// It is the Go representation of the data sent by the Telegraf ping plugin via CloudEvents.
type pingresult struct {
Fields struct {
AverageResponseMS float64 `json:"average_response_ms"`
MaximumResponseMS float64 `json:"maximum_response_ms"`
MinimumResponseMS float64 `json:"minimum_response_ms"`
PacketsReceived int `json:"packets_received"`
PacketsTransmitted int `json:"packets_transmitted"`
PercentPacketLoss float64 `json:"percent_packet_loss"`
ResultCode int `json:"result_code"`
StandardDeviationMS float64 `json:"standard_deviation_ms"`
TTL int `json:"ttl"`
}
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Timestamp timestamp `json:"timestamp"`
}
// processMsg is a function that processes incoming messages from the NATS JetStream.
// It unmarshals the CloudEvent data into a pingresult struct and sends it to the downstream channel.
// If the message cannot be processed, it sends a negative acknowledgment (Nak) to the NATS server.
// This way, the message will be retried later and is not lost.
func processMsg(downstream chan *pingresult) func(msg jetstream.Msg) {
return func(msg jetstream.Msg) {
evt := cloudevents.NewEvent()
if err := evt.UnmarshalJSON(msg.Data()); err != nil {
msg.Nak()
slog.Error("Failed to unmarshal CloudEvent", "error", err)
return
}
r := new(pingresult)
if err := evt.DataAs(r); err != nil {
msg.Nak()
slog.Error("Failed to unmarshal CloudEvent data", "error", err)
return
}
slog.Debug("Received CloudEvent", "event", evt.Type(),
"source", evt.Source(),
"url", r.Tags["url"],
"average", r.Fields.AverageResponseMS,
"packageloss", r.Fields.PercentPacketLoss,
"subject", msg.Subject(),
)
msg.Ack()
downstream <- r
}
}
func main() {
var cfg config
envconfig.Usage("processor", &cfg)
// Load configuration from environment variables
if err := envconfig.Process("processor", &cfg); err != nil {
slog.Error("Failed to process environment variables", "error", err)
os.Exit(1)
}
// Make sure we have a decent logger
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.Level(cfg.LogLevel),
})))
slog.Info("Starting processor service", "cfg", cfg)
// We need to handle OS signals to gracefully shut down the service
// This is important for long-running services to avoid data loss
// and to clean up resources properly.
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
// Connect to the NATS server
nc, err := nats.Connect(fmt.Sprintf("nats://%s", cfg.Nats.Host), nats.Name(cfg.Nats.Client.Name))
if err != nil {
slog.Error("Failed to connect to NATS server", "error", err)
os.Exit(1)
}
defer nc.Close()
// Create a JetStream context...
js, err := jetstream.New(nc)
if err != nil {
slog.Error("Failed to create JetStream context", "error", err)
os.Exit(1)
}
// ... and connect to the stream of incoming CloudEvents
stream, err := js.Stream(ctx, cfg.Nats.Stream.Name)
if err != nil {
if err != jetstream.ErrStreamNotFound {
slog.Error("Failed to get stream", "error", err)
os.Exit(1)
}
slog.Error("Stream not found!", "error", err)
os.Exit(1)
}
// Create a consumer for the stream...
cons, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "telegraf"})
if err != nil {
slog.Error("Failed to create consumer", "error", err)
os.Exit(1)
}
// ... and consume messages from the stream.
procChan := make(chan *pingresult, 64)
// Note that we hand over a callback function to the consumer (the result of processMsg)
// This function will be called for each message received from the stream.
cons.Consume(processMsg(procChan))
// All we have to do now is to process the messages received from the stream.
// Of course, we could also do this in the callback function, but this way we can
// separate the processing logic from the message receiving logic.
go func() {
for {
select {
case msg := <-procChan:
// Here you can add your processing logic
// For example, you can send the result to another stream or store it in a database
slog.Info("Processing ping result", "result", msg)
case <-ctx.Done():
// THis will be called when the service received SIGINT or SIGTERM
slog.Info("Shutting down processor service")
return
}
}
}()
}
You can sure expand on it, but here, it only serves as a stand in.
Some random additional thoughts
- With the setup shown above, you can scale both horizontally and vertically.
- Depending on your use cases, it might also be a good idea to use Kafka, Kafka Streams and maybe ksqlDB, especially if your primary use case are aggregations. Telegraf could even be adapted to that: use the kafka output plugin while keeping the output format and you should be good to go.
Explore related questions
See similar questions with these tags.