1
\$\begingroup\$

I am trying to read multiple files in parallel in such a way so that each go routine that is reading a file write its data to that channel, then have a single go-routine that listens to that channel and adds the data to the map. Here is my play.

This handles error and if there are any errors reading the file then it cancels other go-routines as well waiting to read the file. Below is my worker pool implementation which works fine:

package main
import (
 "context"
 "encoding/json"
 "fmt"
 "os"
 "golang.org/x/sync/errgroup"
)
func main() {
 var myFiles = []string{"file1", "file2", "file3", "file4", "file5", "file6"}
 fileChan := make(chan string)
 dataChan := make(chan fileData)
 g, ctx := errgroup.WithContext(context.Background())
 for i := 0; i < 3; i++ {
 worker_num := i
 g.Go(func() error {
 for file := range fileChan {
 if err := getBytesFromFile(file, dataChan); err != nil {
 fmt.Println("worker", worker_num, "failed to process", file, ":", err.Error())
 return err
 } else if err := ctx.Err(); err != nil {
 fmt.Println("worker", worker_num, "context error in worker:", err.Error())
 return err
 }
 }
 fmt.Println("worker", worker_num, "processed all work on channel")
 return nil
 })
 }
 // dispatch files
 g.Go(func() error {
 defer close(fileChan)
 done := ctx.Done()
 for _, file := range myFiles {
 if err := ctx.Err(); err != nil {
 return err
 }
 select {
 case fileChan <- file:
 continue
 case <-done:
 break
 }
 }
 return ctx.Err()
 })
 var err error
 go func() {
 err = g.Wait()
 close(dataChan)
 }()
 var myMap = make(map[string]string)
 for data := range dataChan {
 myMap[data.name] = data.bytes
 }
 if err != nil {
 fmt.Println("errgroup Error:", err.Error())
 }
 enc := json.NewEncoder(os.Stdout)
 enc.SetIndent("", " ")
 if err := enc.Encode(myMap); err != nil {
 panic(err)
 }
}
type fileData struct {
 name,
 bytes string
}
func getBytesFromFile(file string, dataChan chan fileData) error {
 bytes, err := openFileAndGetBytes(file)
 if err == nil {
 dataChan <- fileData{name: file, bytes: bytes}
 }
 return err
}
func openFileAndGetBytes(file string) (string, error) {
 if file == "file2" {
 return "", fmt.Errorf("%s cannot be read", file)
 }
 return fmt.Sprintf("these are some bytes for file %s", file), nil
}

Problem Statement

I am working with Go 1.17. As of now everything is tied to my main method. I want to take out worker pool implementation inside my main method in it's own classes so that it can be reused by multiple pieces in my application efficiently. I have few other code where I can use this worker pool implementation which works fine for my this particular usecase.

Opting for codereview to see if there is any improvement I can do in my above code and also move this out into its own class and structs so that it can be reuse by other pieces of code.

Toby Speight
87.3k14 gold badges104 silver badges322 bronze badges
asked Mar 17, 2022 at 5:46
\$\endgroup\$

1 Answer 1

1
\$\begingroup\$

Your code is very clear and simple. I have a few suggestions.

One small suggestion:

  • This is subjective, but I find Printf to be much more readable than Println because the format string shows you what the result will look like. Compare fmt.Println("worker", worker_num, "failed to process", file, ":", err.Error()) with fmt.Printf("worker %d failed to process %s: %s\n", worker_num, file, err)

And I have one comment about the concurrency strategy used. This next comment comes from the rethinking concurrency presentation so I would recommend reading through that as it has a lot of other great concurrency patterns for go.

The principle to keep in mind is "start goroutines when you have concurrent work". Using worker threads that each read from a channel can have some disadvantages: if fileChan has no data, then all of the workers are sitting idle waiting for data to be available. Instead, I would recommend to start one goroutine per file to be processed, and limit the number of simultaneously active goroutines by using a semaphore.

The worker goroutines will be replaced by something like this:

// sem acts as a semaphore to limit the
// number of concurrent goroutines.
sem := chan(struct{}, 3)
for _, file := range myFiles {
 // Use a select so that if ctx is cancelled early
 // we exit immediately.
 select {
 case <-ctx.Done():
 break
 case sem <- struct{}{}:
 }
 // Get a local copy of file for the goroutine.
 // https://go.dev/doc/faq#closures_and_goroutines
 file := file
 // Start a goroutine when you have concurrent work.
 g.Go(func() error {
 defer func() { <-sem }()
 // Process file as normal.
 return getBytesFromFile(file, dataChan)
 })
}

This would also eliminate the need to dispatch the list of files into a channel in a separate goroutine.

answered Mar 17, 2022 at 15:40
\$\endgroup\$
5
  • \$\begingroup\$ Thanks for suggestion. Do you think is there any way to move this worker pool implementation out into its own classes and structs and then just use those new classes and struct from my main application? I am trying to make this re-usable so that it can be reused by other pieces of code. Any help will be appreciated on this. \$\endgroup\$ Commented Mar 17, 2022 at 16:39
  • \$\begingroup\$ I wouldn't recommend trying to make a class or struct for this. One of the main principles of go code is simplicity. This pattern is very common and easy to type out multiple times if you need it in multiple places. You could always put all the code into a function that takes the list of files as an argument, but I'm not sure if that's what you meant. \$\endgroup\$ Commented Mar 17, 2022 at 17:05
  • \$\begingroup\$ I mean to say in the same class there are 2-3 different places where I need to use worker pool implementation so that is why I was thinking to separate it out so that it can be reused otherwise I need to copy paste same piece of code in all those places. What do you think now? \$\endgroup\$ Commented Mar 17, 2022 at 17:32
  • \$\begingroup\$ It's hard to give a recommendation without knowing more details. You could start by copy-pasting, and then once you see which parts of the logic are shared, move the common logic into a helper function. \$\endgroup\$ Commented Mar 17, 2022 at 17:53
  • \$\begingroup\$ Thanks I think I got your point now. I got another question here where I could use some help and it is puzzling me because of memory footprint as it is fluctuating a lot whenever I deserialize my bytes into that struct. \$\endgroup\$ Commented Mar 18, 2022 at 0:32

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.