15
\$\begingroup\$

Task: per host concurrency limits for web crawler (map[string]Semaphore).

I considered chan struct{} (chan bool) approach, but essentially it would not make code a lot easier because the main hurdle is to delete unused keys from map. And then semaphore takes constant memory - good property - as opposed to chan which grows with each "acquired limit".

I started with Semaphore using sync. Cond Wait/Signal (desired behavior of semaphore is well described in comments). Then I created map[string]*Semaphore with Mutex around all map operations and after semaphore is on hands, Acquire/Release it - that would block caller if needed but not block other access to map.

To remove unused semaphores from map I use separate counter that is stored in semaphore struct and modified only inside map lock. Its value sometimes differs from semaphore.value. When this counter goes to zero I know there are no goroutines that have pointer to semaphore from map except maybe one that is going to do final Release() now, so it's safe to delete key from map to preserve memory.

Essentially it works, test passes, but I would really appreciate any feedback on this approach.

Code link: https://gist.github.com/4130335

// Package limitmap provides map of semaphores to limit concurrency against some string keys.
//
// Usage:
// limits := NewLimitMap()
// func process(url *url.URL, rch chan *http.Response) {
// // At most 2 concurrent requests to each host.
// limits.Acquire(url.Host, 2)
// defer limits.Release(url.Host)
// r, err := http.Get(url.String())
// rch <- r
// }
// for url := range urlChan {
// go process(url, rch)
// }
package limitmap
import (
 "sync"
)
// Internal structure, may be changed.
// Requirements for this data structure:
// * Acquire() will not block until internal counter reaches set maximum number
// * Release() will decrement internal counter and wake up one goroutine blocked on Acquire().
// Calling Release() when internal counter is zero is programming error, panic.
type Semaphore struct {
 // Number of Acquires - Releases. When this goes to zero, this structure is removed from map.
 // Only updated inside LimitMap.lk lock.
 refs int
 max uint
 value uint
 wait sync.Cond
}
func NewSemaphore(max uint) *Semaphore {
 return &Semaphore{
 max: max,
 wait: sync.Cond{L: new(sync.Mutex)},
 }
}
func (s *Semaphore) Acquire() uint {
 s.wait.L.Lock()
 defer s.wait.L.Unlock()
 for i := 0; ; i++ {
 if uint(s.value)+1 <= s.max {
 s.value++
 return s.value
 }
 s.wait.Wait()
 }
 panic("Unexpected branch")
}
func (s *Semaphore) Release() (result uint) {
 s.wait.L.Lock()
 defer s.wait.L.Unlock()
 s.value--
 if s.value < 0 {
 panic("Semaphore Release without Acquire")
 }
 s.wait.Signal()
 return
}
type LimitMap struct {
 lk sync.Mutex
 limits map[string]*Semaphore
 wg sync.WaitGroup
}
func NewLimitMap() *LimitMap {
 return &LimitMap{
 limits: make(map[string]*Semaphore),
 }
}
func (m *LimitMap) Acquire(key string, max uint) {
 m.lk.Lock()
 l, ok := m.limits[key]
 if !ok {
 l = NewSemaphore(max)
 m.limits[key] = l
 }
 l.refs++
 m.lk.Unlock()
 m.wg.Add(1)
 if x := l.Acquire(); x < 0 || x > l.max {
 panic("oia")
 }
}
func (m *LimitMap) Release(key string) {
 m.lk.Lock()
 l, ok := m.limits[key]
 if !ok {
 panic("LimitMap: key not in map. Possible reason: Release without Acquire.")
 }
 l.refs--
 if l.refs < 0 {
 panic("LimitMap internal error: refs < 0.")
 }
 if l.refs == 0 {
 delete(m.limits, key)
 }
 m.lk.Unlock()
 if x := l.Release(); x < 0 || x > l.max {
 panic("oir")
 }
 m.wg.Done()
}
// Wait until all released.
func (m *LimitMap) Wait() {
 m.wg.Wait()
}
func (m *LimitMap) Size() (keys int, total int) {
 m.lk.Lock()
 keys = len(m.limits)
 for _, l := range m.limits {
 total += int(l.value)
 }
 m.lk.Unlock()
 return
}
jacwah
2,69118 silver badges42 bronze badges
asked Sep 26, 2012 at 11:05
\$\endgroup\$
0

2 Answers 2

4
\$\begingroup\$

For my initial implementation, I would personally apply the common semaphore pattern using buffered channels for controlling the number of running goroutines. The simplest application of which looks like the code below (also available in this gist):

func doWork(s string, ch <-chan struct{}, wg *sync.WaitGroup) { 
 defer func() { 
 <-ch // free up space in the semaphore 
 wg.Done() // tell the WaitGroup we're finished 
 }() 
 fmt.Println(s) 
} 
func execute(work []string) { 
 wg := &sync.WaitGroup{} 
 sema := make(chan struct{}, 10) // concurrency limit of 10 
 for _, url := range work { 
 // if there are 10 items in flight, channel is full / will block 
 // unblocks when a worker finishes 
 sema <- struct{}{} 
 wg.Add(1) 
 go doWork(url, sema) 
 } 
 // close the channel as nothing else should write 
 close(sema) 
 // wait for all goroutines to finish 
 wg.Wait() 
} 

If you enhanced this example a bit to use one channel per map key, where you then pass the channel in to the worker function to be read from, I think it'll work for you.

If you're curious why I decided to use a struct{} type for the channel, Dave Cheney has a good post explaining it here.

Sᴀᴍ Onᴇᴌᴀ
29.5k16 gold badges45 silver badges201 bronze badges
answered Apr 17, 2018 at 15:44
\$\endgroup\$
1
\$\begingroup\$

Tim H's solution with a buffered channel is a good start but it has a couple of downsides:

  • You need to do channel and WorkGroup operations in tandem. A failure to do that will lead to deadlocks (or worse: a silent unlimited work queue).
  • You'll need to manually handle blocking and timeouts on WorkGroup insertions due to hitting the limit.

I would propose using something like github.com/remeh/sizedwaitgroup which is a drop-in replacement for the WaitGroup and hides all the complexity behind clean interfaces: New, AddWithContext, and Done.

answered Apr 9, 2022 at 1:39
\$\endgroup\$
3
  • \$\begingroup\$ I think you are reviewing description, not the code. There are no buffered channels in code. \$\endgroup\$ Commented Apr 11, 2022 at 9:47
  • \$\begingroup\$ I was referring to the Tim's answer, let me clarify this a bit. \$\endgroup\$ Commented Apr 11, 2022 at 12:42
  • \$\begingroup\$ Then you may want to make it a comment to another answer. \$\endgroup\$ Commented Apr 11, 2022 at 22:16

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.