I’ve developed a Go program to benchmark a FHIR server by uploading a dataset of FHIR resources concurrently.
The goal is to stress-test the server by sending a specified number (-threads
parameter) of concurrent upload requests. Each FHIR resource is uploaded via a single HTTP PUT request, and I’m using goroutines along with a semaphore to control concurrency.
My main question is: Does this program reliably achieve n concurrent requests to the server, as intended? I want to ensure that the server receives n simultaneous upload requests (or as close to that as possible) and that the program accurately measures upload speed and errors under this load.
The specific areas I’d love feedback on include:
- Concurrency Management: Is the use of goroutines and semaphore appropriate for controlling n concurrent requests?
- Accuracy of Metrics: I’ve used mutexes to prevent race conditions in counting successful uploads and errors. Will this setup ensure accurate metrics?
- Overall Program Structure: Any suggestions to improve code clarity, efficiency, or Go idioms would be greatly appreciated.
Below is the code (approx. 150 lines). For testing purposes, the dataset and full repository are available here.
package main
import (
"encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
"net/http"
"path/filepath"
"strings"
"sync"
"time"
)
type Resource struct {
ResourceType string
ID string `json:"id"`
}
func main() {
bearerToken := flag.String("bearer_token", "", "Bearer token for authorization")
serverURL := flag.String("server_url", "", "Base URL of the FHIR server")
filesList := flag.String("files", "", "Comma-separated list of NDJSON files to upload")
threads := flag.Int("threads", 2, "Number of concurrent threads within a single file")
flag.Parse()
if *serverURL == "" || *filesList == "" {
fmt.Println("Server URL and files list are required.")
return
}
files := strings.Split(*filesList, ",")
totalResources := 0
totalErrors := 0
dataMap := make(map[string][]string)
idsMap := make(map[string][]string)
for _, file := range files {
trimmedFile := strings.TrimSpace(file)
data, err := ioutil.ReadFile(trimmedFile)
if err != nil {
fmt.Printf("Failed to read file %s: %v\n", trimmedFile, err)
continue
}
lines := strings.Split(string(data), "\n")
dataLines := make([]string, 0, len(lines))
ids := make([]string, 0, len(lines))
for _, line := range lines {
if line == "" {
continue
}
var resource Resource
if err := json.Unmarshal([]byte(line), &resource); err != nil {
fmt.Println("Error parsing JSON:", err)
continue
}
dataLines = append(dataLines, line)
ids = append(ids, resource.ID)
}
resourceType := filepath.Base(trimmedFile)
resourceType = strings.Split(resourceType, ".")[0]
dataMap[resourceType] = dataLines
idsMap[resourceType] = ids
}
start := time.Now()
for _, file := range files {
trimmedFile := strings.TrimSpace(file)
resourceType := filepath.Base(trimmedFile)
resourceType = strings.Split(resourceType, ".")[0]
dataLines := dataMap[resourceType]
ids := idsMap[resourceType]
count, errors := uploadData(*bearerToken, *serverURL, resourceType, dataLines, ids, *threads)
totalResources += count
totalErrors += errors
}
fmt.Printf("Completed in %v\n", time.Since(start))
fmt.Printf("Successfully uploaded %d resources with %d errors\n", totalResources, totalErrors)
}
func uploadData(bearerToken, serverURL, resourceType string, data, ids []string, threads int) (int, int) {
var wg sync.WaitGroup
semaphore := make(chan struct{}, threads)
transport := &http.Transport{
MaxIdleConns: 256,
MaxIdleConnsPerHost: 256,
MaxConnsPerHost: 256,
}
client := &http.Client{
Timeout: time.Second * 30,
Transport: transport,
}
var count int
var errors int
var mu sync.Mutex
for i, jsonData := range data {
wg.Add(1)
go func(jsonStr, id string) {
defer wg.Done()
semaphore <- struct{}{}
resourceURL := fmt.Sprintf("%s/%s/%s", serverURL, resourceType, id)
req, err := http.NewRequest("PUT", resourceURL, strings.NewReader(jsonStr))
if err != nil {
fmt.Println("Error creating PUT request:", err)
mu.Lock()
errors++
mu.Unlock()
<-semaphore
return
}
req.Header.Set("Content-Type", "application/fhir+json")
if bearerToken != "" {
req.Header.Set("Authorization", "Bearer "+bearerToken)
}
response, err := client.Do(req)
if err != nil {
fmt.Println("Error sending PUT request:", err)
mu.Lock()
errors++
mu.Unlock()
<-semaphore
return
}
defer response.Body.Close()
io.Copy(ioutil.Discard, response.Body)
if response.StatusCode != http.StatusOK && response.StatusCode != http.StatusCreated {
fmt.Printf("Received non-successful status: %s\n", response.Status)
mu.Lock()
errors++
mu.Unlock()
} else {
mu.Lock()
count++
mu.Unlock()
}
<-semaphore
}(jsonData, ids[i])
}
wg.Wait()
return count, errors
}
1 Answer 1
- I'd move the setup for the HTTP client out of the
uploadData
function, as it's not necessary to recreate it every time. - Move definitions closer to where they're being used
(e.g.
totalResources
is about 50 lines away from its first use). The better locality will make it easier to understand what's happening. - Use up-to-date library definitions (e.g.
os.ReadFile
in favour ofioutil.ReadFile
,io.Discard
instead ofioutil.Discard
). - I'd suggest keeping cleanup functionality together. Things like
discarding all of the HTTP response body and then closing it belong
together. In case of a
defer
put everything in there instead of having one of those in the regular code path and the other one during thedefer
. - Explicitly ignore, or log errors. Debatable, but at least GoLand will flag every one of those instances. It might eventually pay off to see if some of those calls errored out, though it's going to be exceptional situations like low memory, or maybe a networked filesystem where some things that "don't fail", will.
- The semaphore usage is odd. I'd have said to spawn to worker
goroutines and have them read from the queue, that seems like a much
more common way of doing this, which also doesn't require any further
manual coordination. Depending on whether you want to only count
errors, or also collect them into a list, either the
atomic
package, or achan error
could be used to get rid of the locking too. - Consider using a logging library, like the standard
slog
. - I'd keep data and ID together. Just use another custom struct, that's
easier to handle in the long run (just imagine adding one, or a few
more data points and suddenly you have several more
fooMap
,barMap
and corresponding variable names to juggle) and more descriptive, since you can pass around e.g. adata []BenchmarkItem
instead ofdata, ids []string
. - Number of threads could default to number of CPU cores? Not sure that makes sense for your use case, but it's a pretty common default for workers.
- Consider splitting the worker setup from the HTTP call, making things
much easier testable. That is to say, move the body of the loop into
its own function, which can be called from more places, including
tests, than
uploadData
. It also makes it easier to use regular error handling up until the place where you actually decide to log your error, making things more composable as a result (c.f.fmt.Errorf
to wrap underlying errors, but also keep in mind that better functions likeWrap
orWrapf
exist too in other libraries). - Consider not splitting the whole file, but doing it piecemeal via
bufio.Scanner
, which should be a lot more efficient for bigger files. In this case it's just good practice. ResourceType
onResource
is unused, remove.resourceType
is calculated twice, just reuse it. Though I don't quite understand why it's done this way anyway, wouldn't a list of resource types plus the items for each be sufficient? Nomap
necessary.
With all that in mind and some more things I've probably changed, this is what that would look like (not tested):
package main
import (
"bufio"
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
)
type Resource struct {
ID string `json:"id"`
}
type BenchmarkItem struct {
ID string
Data []byte
}
type Benchmark struct {
ResourceType string
Items []BenchmarkItem
}
func main() {
bearerToken := flag.String("bearer_token", "", "Bearer token for authorization")
serverURL := flag.String("server_url", "", "Base URL of the FHIR server")
filesList := flag.String("files", "", "Comma-separated list of NDJSON files to upload")
threads := flag.Int("threads", 2, "Number of concurrent threads within a single file")
flag.Parse()
if *serverURL == "" || *filesList == "" {
fmt.Println("Server URL and files list are required.")
return
}
files := strings.Split(*filesList, ",")
benchmarks := make([]Benchmark, 0, len(files))
for _, file := range files {
trimmedFile := strings.TrimSpace(file)
f, err := os.Open(trimmedFile)
if err != nil {
fmt.Printf("Failed to read file %s: %v\n", trimmedFile, err)
continue
}
func() {
defer func() { _ = f.Close() }()
var items []BenchmarkItem
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
continue
}
var resource Resource
if err := json.Unmarshal(line, &resource); err != nil {
fmt.Println("Error parsing JSON:", err)
continue
}
items = append(items, BenchmarkItem{ID: resource.ID, Data: line})
}
resourceType, _, _ := strings.Cut(filepath.Base(trimmedFile), ".")
benchmarks = append(benchmarks, Benchmark{ResourceType: resourceType, Items: items})
}()
}
transport := &http.Transport{
MaxIdleConns: 256,
MaxIdleConnsPerHost: 256,
MaxConnsPerHost: 256,
}
client := &http.Client{
Timeout: time.Second * 30,
Transport: transport,
}
start := time.Now()
totalResources := 0
totalErrors := 0
for _, benchmark := range benchmarks {
count, errors := uploadData(client, *bearerToken, *serverURL, *threads, benchmark)
totalResources += count
totalErrors += errors
}
fmt.Printf("Completed in %v\n", time.Since(start))
fmt.Printf("Successfully uploaded %d resources with %d errors\n", totalResources, totalErrors)
}
func callServer(client *http.Client, bearerToken, serverURL, resourceType string, item BenchmarkItem) error {
resourceURL := fmt.Sprintf("%s/%s/%s", serverURL, resourceType, item.ID)
req, err := http.NewRequest("PUT", resourceURL, bytes.NewReader(item.Data))
if err != nil {
return fmt.Errorf("error creating PUT request: %w", err)
}
req.Header.Set("Content-Type", "application/fhir+json")
if bearerToken != "" {
req.Header.Set("Authorization", "Bearer "+bearerToken)
}
response, err := client.Do(req)
if err != nil {
return fmt.Errorf("error sending PUT request: %w", err)
}
defer func() {
_, _ = io.Copy(io.Discard, response.Body)
_ = response.Body.Close()
}()
if response.StatusCode != http.StatusOK && response.StatusCode != http.StatusCreated {
return fmt.Errorf("received non-successful status: %s", response.Status)
}
return nil
}
func uploadData(client *http.Client, bearerToken, serverURL string, threads int, benchmark Benchmark) (int, int) {
var wg sync.WaitGroup
wg.Add(threads)
queue := make(chan BenchmarkItem, threads)
var count, errors atomic.Int64
for i := 0; i < threads; i++ {
go func() {
defer wg.Done()
for item := range queue {
if err := callServer(client, bearerToken, serverURL, benchmark.ResourceType, item); err != nil {
errors.Add(1)
} else {
count.Add(1)
}
}
}()
}
for _, item := range benchmark.Items {
queue <- item
}
close(queue)
wg.Wait()
return int(count.Load()), int(errors.Load())
}
There's likely more things that could be done, but I feel at this point it's readable enough and has been factored in more reusable parts already. Take it as a starting point for further work I suppose.