Here is an example where I'm trying to understand the concepts of buffered channels> I have three functions and create a buffered channel of length 3. And also passed a waitgroup to nofiy when all the goroutines are done. And finally collecting the values through range.
Could you please help me in reviewing this code? And where I could improve?
package main
import (
"fmt"
"sync"
)
type f func(int, int, chan int, *sync.WaitGroup)
func add(x, y int, r chan int, wg *sync.WaitGroup) {
fmt.Println("Started Adding function....")
r <- (x + y)
wg.Done()
}
func sub(x, y int, r chan int, wg *sync.WaitGroup) {
fmt.Println("Started Difference function")
r <- (x - y)
wg.Done()
}
func prod(x, y int, r chan int, wg *sync.WaitGroup) {
fmt.Println("Started Prod function")
r <- (x * y)
wg.Done()
}
func main() {
var operations []f = []f{add, sub, prod}
ch := make(chan int, len(operations))
wg := sync.WaitGroup{}
x, y := 10, 20
wg.Add(len(operations))
for _, i := range operations {
go i(x, y, ch, &wg)
}
wg.Wait()
close(ch)
for val := range ch {
fmt.Println(val)
}
}
2 Answers 2
The code in the question cannot determine which result corresponds to which operation. Otherwise, the code is correct. Here are two alternatives for improving the code:
1. Eliminate the channel
- Change the operations to simple functions that return an int. This makes it easier to test and reason about the implementation of the operations.
- Collect the results in a slice instead of in a channel. With this change, we know that result of operations[0] is at slice index 0, operations[1] is at slice index 1 and so on.
- Move all the waitgroup and goroutine related code together in main. This makes the concurrency aspect of the program easier to understand.
Here's the code:
package main
import (
"fmt"
"sync"
)
type f func(int, int) int
func add(x, y int) int {
return (x + y)
}
func sub(x, y int) int {
return (x - y)
}
func prod(x, y int) int {
return (x * y)
}
func main() {
var operations []f = []f{add, sub, prod}
x, y := 10, 20
results := make([]int, len(operations))
var wg sync.WaitGroup
wg.Add(len(operations))
for i, fn := range operations {
go func(i int, fn f) {
defer wg.Done()
results[i] = fn(x, y)
}(i, fn)
}
wg.Wait()
fmt.Println(results)
}
2. Eliminate the waitgroup
The application can receive the known number of values sent to the channel instead of receiving until the channel is closed. If the channel is not closed, then there's no need for the waitgroup. What's more, there's no need to buffer the channel.
package main
import (
"fmt"
)
type f func(int, int, chan int)
func add(x, y int, r chan int) {
r <- (x + y)
}
func sub(x, y int, r chan int) {
r <- (x - y)
}
func prod(x, y int, r chan int) {
r <- (x * y)
}
func main() {
var operations []f = []f{add, sub, prod}
ch := make(chan int)
x, y := 10, 20
for _, i := range operations {
go i(x, y, ch)
}
for range operations {
val := <-ch
fmt.Println(val)
}
}
I've majorly written the review comments inline with the code. I could've made the code simple as @thwd suggested; but as you're learning and this is not an actual use case, I'm not changing the logic.
Also, to know about buffered channels better, do read this.
package main
import (
"fmt"
"sync"
)
// As goroutine could schedule in any order, it's better if the
// operation is stored as well, with the result.
type result struct {
op string
v int
}
// It's better to use unidirectional channels if bi-directional
// is not needed. It can also distinguish how a channel should be used
// in different function/ methods (eg. producer, consumer)
type fn func(int, int, chan<- result, *sync.WaitGroup)
func add(x, y int, r chan<- result, wg *sync.WaitGroup) {
fmt.Println("op: add")
r <- result{"add", x + y}
wg.Done()
}
func sub(x, y int, r chan<- result, wg *sync.WaitGroup) {
fmt.Println("op: sub")
r <- result{"sub", x - y}
wg.Done()
}
func prod(x, y int, r chan<- result, wg *sync.WaitGroup) {
fmt.Println("op: prod")
r <- result{"prod", x * y}
wg.Done()
}
func main() {
// In case of multiple declaration, you can group
// But it's totally dependent on the developer writing it.
// But the ultimate goal is write clean code.
var (
fops = []fn{add, sub, prod}
ch = make(chan result, len(fops))
wg sync.WaitGroup
x, y = 10, 20
)
wg.Add(len(fops))
for _, f := range fops {
go f(x, y, ch, &wg)
}
wg.Wait()
close(ch)
// Either use range, or you run a loop from [0, len(fops));
// that would work as well!
for v := range ch {
fmt.Println(v)
}
}
r <- (x/y)
(or any other write to the channel) in any of the routines and the waitgroup will never finish (the channel buffer would be full, and a write would block). We use waitgroups specifically when we have routines do work, and we can't sensibly buffer the channel/output. We consume the channel while the work is being done, and wait for the waitgroup to be done to tidy up (close the channel etc...) \$\endgroup\$