I am trying to read bunch of S3 parquet files in parallel from a S3 bucket. After reading all these files, I am populating my products
and productCatalog
concurrent map. This happens during server startup and then I have getters method GetProductMap
and GetProductCatalogMap
to return these maps which be used by main application threads.
My getters method will be called by lot of application threads concurrently so idea is populate these maps during server startup (then also periodically from a background thread using ticker) and then access it via getters from main application threads so I want to be in atomic state when writes happen, it is immediately accessed by reader threads.
type clientRepo struct {
s3Client *awss3.S3Client
deltaChan chan string
done chan struct{}
err chan error
wg sync.WaitGroup
cfg *ParquetReaderConfig
products *cmap.ConcurrentMap
productCatalog *cmap.ConcurrentMap
}
type fileChannel struct {
fileName string
index int
}
Below is my loadFiles
method which given a path
find all the files I need to read in parallel. I am using errgroup
here to communicate error states across goroutines. Idea is very simple here - Find all the files from S3 bucket and then read them in parallel. Populate my internal maps and then use those internal maps to populate my concurrent map.
func (r *clientRepo) loadFiles(path string, spn log.Span) error {
var err error
bucket, key, err := awss3.ParseS3Path(path)
if err != nil {
return err
}
var files []string
files, err = r.s3Client.ListObjects(bucket, key, ParquetFileExtension)
if err != nil {
return err
}
spn.Infof("Loading files from %s. Total files: %d", path, len(files))
start := time.Now()
fileChan := make(chan fileChannel)
g, ctx := errgroup.WithContext(context.Background())
for i := 0; i < runtime.NumCPU()-2; i++ {
workerNum := i
g.Go(func() error {
for file := range fileChan {
if err := r.read(spn, file.fileName, bucket); err != nil {
spn.Infof("worker %d failed to process %s : %s", workerNum, file, err.Error())
return err
} else if err := ctx.Err(); err != nil {
spn.Infof("worker %d context error in worker: %s", workerNum, err.Error())
return err
}
}
spn.Infof("worker %d processed all work on channel", workerNum)
return nil
})
}
func() {
for idx, file := range files {
select {
case fileChan <- fileChannel{fileName: file, index: idx}:
continue
case <-ctx.Done():
return
}
}
}()
close(fileChan)
err = g.Wait()
if err != nil {
return err
}
spn.Info("Finished loading all files. Total duration: ", time.Since(start))
return nil
}
Here is read
method which reads each file, deserializes them into ClientProduct
struct and then I iterate over that to populate my internal maps. And then from those internal maps, I populate my concurrent map. I am not sure whether I need to do this - Maybe collect all these data in a channel and then populate it in read
method but it can increase memory footprint by a lot so that's why I went with this design.
func (r *clientRepo) read(spn log.Span, file string, bucket string) error {
var err error
var products = make(map[string]*definitions.CustomerProduct)
var productCatalog = make(map[string]map[int64]bool)
fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
if err != nil {
return errs.Wrap(err)
}
defer xio.CloseIgnoringErrors(fr)
pr, err := reader.NewParquetReader(fr, nil, int64(r.cfg.DeltaWorkers))
if err != nil {
return errs.Wrap(err)
}
if pr.GetNumRows() == 0 {
spn.Infof("Skipping %s due to 0 rows", file)
return nil
}
for {
rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
if err != nil {
return errs.Wrap(err)
}
if len(rows) <= 0 {
break
}
byteSlice, err := json.Marshal(rows)
if err != nil {
return errs.Wrap(err)
}
var productRows []ClientProduct
err = json.Unmarshal(byteSlice, &productRows)
if err != nil {
return errs.Wrap(err)
}
for i := range productRows {
var flatProduct definitions.CustomerProduct
err = r.Convert(spn, &productRows[i], &flatProduct)
if err != nil {
return errs.Wrap(err)
}
if flatProduct.StatusCode == definitions.DONE {
continue
}
products[strconv.FormatInt(flatProduct.ProductId, 10)] = &flatProduct
for _, catalogId := range flatProduct.Catalogs {
catalogValue := strconv.FormatInt(int64(catalogId), 10)
if v, ok := productCatalog[catalogValue]; ok {
v[flatProduct.ProductId] = true
} else {
productCatalog[catalogValue] = map[int64]bool{flatProduct.ProductId: true}
}
}
}
}
for k, v := range products {
r.products.Set(k, v)
}
for k, v := range productCatalog {
r.productCatalog.Upsert(k, v, func(exists bool, valueInMap interface{}, newValue interface{}) interface{} {
m := newValue.(map[int64]bool)
var updatedMap map[int64]bool
if valueInMap == nil { // New value!
updatedMap = m
} else {
typedValueInMap := valueInMap.([]int64)
updatedMap = m
for _, k := range typedValueInMap {
updatedMap[k] = true
}
}
a := make([]int64, 0, len(m))
for k := range m {
a = append(a, k)
}
return a
})
}
return nil
}
And these are my getter methods which will be accessed by main application threads:
func (r *clientRepo) GetProductMap() *cmap.ConcurrentMap {
return r.products
}
func (r *clientRepo) GetProductCatalogMap() *cmap.ConcurrentMap {
return r.productCatalog
}
Note:
My products
map is made of productId as the key and value as flatProduct
.
But my productCatalog
map is made of catalogId
as the key and unique list of productIds
as the value.
Here is the concurrent map I am using - https://github.com/orcaman/concurrent-map And here is the upsert method which I am using - https://github.com/orcaman/concurrent-map/blob/master/concurrent_map.go#L56
I am using this parquet library to read all these S3 files.
Problem Statement
I am looking for ideas to see if there is anything that can be improved in above design or the way I am populating my maps. Opting for code review to see if anything can be improved which can improve some performance or reduce memory footprints.
1 Answer 1
Avoid worker pools
The loadFiles
method spawns runtime.NumCPU()-2
goroutines to act as workers in a pool. This is a pattern you are probably familiar with from other languages where threads are OS threads, and so it's better for efficiency if each thread can be scheduled on a separate CPU core. However, in go, goroutines are lightweight and already distributed across OS threads, so it makes less sense to limit the number of threads in this way. In particular if some of the worker goroutines are blocked waiting on I/O, some CPU cores will be sitting idle.
There are good reasons to limit the number of concurrent goroutines, usually related to file descriptor limit and memory usage. However using the number of CPUs to limit the number of goroutines is a code smell.
I would recommend a concurrency pattern we saw in a previous question, namely limiting the number of active goroutines using a semaphore. Then loadFiles
would look something like
func (r *clientRepo) loadFiles(ctx context.Context, path string, spn log.Span) error {
// Load list of files as before.
files := ...
g, ctx := errgroup.WithContext(ctx)
// sem acts as a semaphore to limit the number of concurrent goroutines.
sem := make(chan struct{}, 100)
for _, file := range files {
select {
case <-ctx.Done():
break
case sem <- struct{}{}:
}
file := file
g.Go(func() error {
defer func() { <-sem }()
return r.read(spn, file.fileName, bucket)
})
}
if err := g.Wait(); err != nil {
return err
}
spn.Info("Finished loading all files. Total duration: ", time.Since(start))
return nil
}
A few other minor comments
Pass context as a parameter
Both loadFiles
and read
should take context as a parameter instead of calling context.Background
. This gives the flexibility to add a timeout or cancellation later.
Avoid nested maps.
Nested maps can require a lot of allocations. In read
, consider making productCatalog
a flat map with a struct key type.
In fact, you could consider inserting directly into the concurrent map without constructing a separate map locally. Then read
would look something like
func (r *clientRepo) read(ctx context.Context, spn log.Span, file string, bucket string) error {
// Initialize the reader as before.
pr := ...
for {
rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
if err != nil {
return err
}
if len(rows) <= 0 {
break
}
byteSlice, err := json.Marshal(rows)
if err != nil {
return err
}
var productRows []ClientProduct
err = json.Unmarshal(byteSlice, &productRows)
if err != nil {
return err
}
for i := range productRows {
// Going with the idea that Convert returns
// a CustomerProduct.
flatProduct, err := r.Convert(spn, productRows[i])
if err != nil {
return err
}
if flatProduct.StatusCode == definitions.DONE {
continue
}
r.products.Set(strconv.Itoa(flatProduct.ProductId, 10), flatProduct)
for _, catalogId := range flatProduct.Catalogs {
catalogValue := strconv.FormatInt(int64(catalogId), 10)
r.productCatalog.Upsert(catalogValue, flatProduct.ProductId, func(exists bool, valueInMap interface{}, newValue interface{}) interface{} {
productID := newValue.(int64)
if valueInMap == nil {
return []int64{productID}
}
oldIDs := valueInMap.([]int64)
for _, id := range oldIDs {
if id == productID {
// Already exists, don't add duplicates.
return oldIDs
}
}
return append(oldIDs, productID)
})
}
}
}
return nil
}
This uses a linear scan to check for duplicate product IDs, so it will be faster than allocating a map if the number of IDs is small. Up to you to test the performance of this suggestion.
Avoid "getters"
The GetProductMap
and GetProductCatalogMap
methods are unnecessary boilerplate. Just make products
and productCatalog
exported fields.
-
\$\begingroup\$ Thanks for your good suggestion. I am working on making changes related to worker pools. So if I insert directly into concurrent map then will there be a problem from main application threads reading all the data from the same map? I mean like any performance issue or any locking issue or anything else? Also can you provide an example on this
inserting directly into the concurrent map without constructing a separate map locally
using my same concurrent map upsert strategy? \$\endgroup\$AndyP– AndyP2022年03月20日 08:01:52 +00:00Commented Mar 20, 2022 at 8:01 -
\$\begingroup\$ I am going over that presentation now which you shared in the other question. Btw I have total number of files as 60 max so 100 is good number here for semaphore or should we bring it down? Also I just tried this version on aws environment where I replaced my worker pool implementation with semaphore changes and I am noticing memory is almost full on the box even after file read is completed. And my original worker pool build takes only half of memory on what I have on the box. Not sure what's wrong here with semaphore changes. Is it because of 100 or something else you can think of? \$\endgroup\$AndyP– AndyP2022年03月20日 08:21:01 +00:00Commented Mar 20, 2022 at 8:21
-
\$\begingroup\$ I am still using this as it is for now
g, ctx := errgroup.WithContext(context.Background())
for testing this out. I hope this isn't causing issues with memory with semaphore changes I did. Also I am noticing very weird thing, if I use 20 instead of 100 in semaphore then my file loading isn't completed at all as I have total 60 files. Not sure what's happening behind the scene. Are we missing anything in semaphore logic? \$\endgroup\$AndyP– AndyP2022年03月20日 08:58:48 +00:00Commented Mar 20, 2022 at 8:58 -
\$\begingroup\$ Sorry my original example code forgot to release the semaphore. Fixed. I picked 100 arbitrarily, you can decrease this to trade off memory use/speed. \$\endgroup\$rose– rose2022年03月20日 14:20:26 +00:00Commented Mar 20, 2022 at 14:20
-
\$\begingroup\$ Yeah I went back to the previous question where you also answered same thing so I noticed you forgot to release semaphore. I am testing these changes and so far looking good. Also wanted to ask is there a difference between keeping any id as int64 vs keeping it as string in a map as key? Will that affect memory usage too? \$\endgroup\$AndyP– AndyP2022年03月20日 19:18:24 +00:00Commented Mar 20, 2022 at 19:18