书籍:The Way To Go,第四部分
月光独奏 · · 2794 次点击 · · 开始浏览Channels
var ch1 chan string ch1 = make(chan string) ch1 := make(chan string) buf := 100 ch1 := make(chan string, buf) chanOfChans := make(chan chan int) funcChan := chan func()
func main() {
ch := make(chan string)
go sendData(ch)
go getData(ch)
time.Sleep(1e9)
}
func sendData(ch chan string) {
ch <- "Washington"
ch <- "Tripoli"
ch <- "London"
}
func getData(ch chan string) {
var input string
for {
input = <-ch;
fmt.Printf("%s ", input)
}
}
Semaphore pattern
type Empty interface {}
var empty Empty
...
data := make([]float64, N)
res := make([]float64, N)
sem := make(chan Empty, N) // semaphore
...
for i, xi := range data {
go func (i int, xi float64) {
res[i] = doSomething(i,xi)
sem <- empty
} (i, xi)
}
for i := 0; i < N; i++ { // wait for goroutines to finish
<-sem
}
Channel Factory pattern
func main() {
stream := pump()
go suck(stream) // shortened : go suck( pump() )
time.Sleep(1e9)
}
func pump() chan int {
ch := make(chan int)
go func() {
for i := 0; ; i++ {
ch <- i
}
}()
return ch
}
func suck(ch chan int) {
for {
fmt.Println(<-ch)
}
}
func suck(ch chan int) {
go func() {
for v := range ch {
fmt.Println(v)
}
}()
}
Channel Directionality
// channel can only receive data and cannot be closed
var send_only chan<- int
var recv_only <-chan int // channel can only send data
...
var c = make(chan int) // bidirectional
go source(c)
go sink(c)
func source(ch chan<- int) {
for { ch <- 1 }
}
func sink(ch <-chan int) {
for { <-ch }
}
...
// closing a channel
func sendData(ch chan string) {
ch <- "Washington"
ch <- "Tripoli"
ch <- "London"
ch <- "Beijing"
ch <- "Tokio"
close(ch)
}
func getData(ch chan string) {
for {
input, open := <-ch
if !open {
break
}
fmt.Printf("%s ", input)
}
}
Switching between goroutines with select
select {
case u:= <- ch1:
...
case v:= <- ch2:
...
default: // no value ready to be received
...
}
if all are blocked, it waits until one can proceed
if multiple can proceed, it chooses one at random.
when none of the channel operations can proceed and the default clause is present, then this is executed: the default is always runnable (that is: ready to execute). Using a send operation in a select statement with a default case guarantees that the send will be non-blocking!
channels with timeouts and tickers
// func Tick(d Duration) <-chan Time
import "time"
rate_per_sec := 10
var dur Duration = 1e8 // rate_per_sec
chRate := time.Tick(dur) // every 1/10th of a second
for req := range requests {
<- chRate // rate limit our Service.Method RPC calls
go client.Call("Service.Method", req, ...)
}
// func After(d Duration) <-chan Time
func main() {
tick := time.Tick(1e8)
boom := time.After(5e8)
for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
return
default:
fmt.Println(" .")
time.Sleep(5e7)
}
}
}
using recover with goroutines
func server(workChan <-chan *Work) {
for work := range workChan {
go safelyDo(work)
}
}
func safelyDo(work *Work) {
defer func() {
if err := recover(); err != nil {
log.Printf("work failed with %s in %v:", err, work)
}
}()
do(work)
}
Tasks and Worker Processes
type Pool struct {
Mu sync.Mutex
Tasks []Task
}
func Worker(pool *Pool) {
for {
pool.Mu.Lock()
// begin critical section:
task := pool.Tasks[0] // take the first task
pool.Tasks = pool.Tasks[1:] // update the pool
// end critical section
pool.Mu.Unlock()
process(task)
}
}
func main() {
pending, done := make(chan *Task), make(chan *Task)
go sendWork(pending) // put tasks with work
for i := 0; i < N; i++ { // start N goroutines to do
go Worker(pending, done)
}
consumeWork(done)
}
func Worker(in, out chan *Task) {
for {
t := <-in
process(t)
out <- t
}
}
rule - use locking (mutexes) when: caching information in a shared data structure; holding state information;
rule - use channels when: communicating asynchronous results; distributing units of work; passing ownership of data;
lazy generator
var resume chan int
func integers() chan int {
yield := make (chan int)
count := 0
go func () {
for {
yield <- count
count++
}
} ()
return yield
}
func generateInteger() int {
return <-resume
}
func main() {
resume = integers()
fmt.Println(generateInteger()) //=> 0
fmt.Println(generateInteger()) //=> 1
}
Benchmarking goroutines
func main() {
fmt.Println("sync", testing.Benchmark(BenchmarkChannelSync).String())
fmt.Println("buffered",
testing.Benchmark(BenchmarkChannelBuffered).String())
}
func BenchmarkChannelSync(b *testing.B) {
ch := make(chan int)
go func() {
for i := 0; i < b.N; i++ {
ch <- i
}
close(ch)
}()
for _ = range ch {
}
}
func BenchmarkChannelBuffered(b *testing.B) {
ch := make(chan int, 128)
go func() {
for i := 0; i < b.N; i++ {
ch <- i
}
close(ch)
}()
for _ = range ch {
}
}
// Output:
// Windows: N Time 1 op Operations per sec
// sync 1000000 2443 ns/op --> 409 332 / s
// buffered 1000000 4850 ns/op --> 810 477 / s
implement a mutex
/* mutexes */
func (s semaphore) Lock() {
s.P(1)
}
func (s semaphore) Unlock() {
s.V(1)
}
/* signal-wait */
func (s semaphore) Wait(n int) {
s.P(n)
}
func (s semaphore) Signal() {
s.V(1)
}
有疑问加站长微信联系(非本文作者)
入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889
关注微信- 请尽量让自己的回复能够对别人有帮助
- 支持 Markdown 格式, **粗体**、~~删除线~~、
`单行代码` - 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
- 图片支持拖拽、截图粘贴等方式上传
收入到我管理的专栏 新建专栏
Channels
var ch1 chan string ch1 = make(chan string) ch1 := make(chan string) buf := 100 ch1 := make(chan string, buf) chanOfChans := make(chan chan int) funcChan := chan func()
func main() {
ch := make(chan string)
go sendData(ch)
go getData(ch)
time.Sleep(1e9)
}
func sendData(ch chan string) {
ch <- "Washington"
ch <- "Tripoli"
ch <- "London"
}
func getData(ch chan string) {
var input string
for {
input = <-ch;
fmt.Printf("%s ", input)
}
}
Semaphore pattern
type Empty interface {}
var empty Empty
...
data := make([]float64, N)
res := make([]float64, N)
sem := make(chan Empty, N) // semaphore
...
for i, xi := range data {
go func (i int, xi float64) {
res[i] = doSomething(i,xi)
sem <- empty
} (i, xi)
}
for i := 0; i < N; i++ { // wait for goroutines to finish
<-sem
}
Channel Factory pattern
func main() {
stream := pump()
go suck(stream) // shortened : go suck( pump() )
time.Sleep(1e9)
}
func pump() chan int {
ch := make(chan int)
go func() {
for i := 0; ; i++ {
ch <- i
}
}()
return ch
}
func suck(ch chan int) {
for {
fmt.Println(<-ch)
}
}
func suck(ch chan int) {
go func() {
for v := range ch {
fmt.Println(v)
}
}()
}
Channel Directionality
// channel can only receive data and cannot be closed
var send_only chan<- int
var recv_only <-chan int // channel can only send data
...
var c = make(chan int) // bidirectional
go source(c)
go sink(c)
func source(ch chan<- int) {
for { ch <- 1 }
}
func sink(ch <-chan int) {
for { <-ch }
}
...
// closing a channel
func sendData(ch chan string) {
ch <- "Washington"
ch <- "Tripoli"
ch <- "London"
ch <- "Beijing"
ch <- "Tokio"
close(ch)
}
func getData(ch chan string) {
for {
input, open := <-ch
if !open {
break
}
fmt.Printf("%s ", input)
}
}
Switching between goroutines with select
select {
case u:= <- ch1:
...
case v:= <- ch2:
...
default: // no value ready to be received
...
}
if all are blocked, it waits until one can proceed
if multiple can proceed, it chooses one at random.
when none of the channel operations can proceed and the default clause is present, then this is executed: the default is always runnable (that is: ready to execute). Using a send operation in a select statement with a default case guarantees that the send will be non-blocking!
channels with timeouts and tickers
// func Tick(d Duration) <-chan Time
import "time"
rate_per_sec := 10
var dur Duration = 1e8 // rate_per_sec
chRate := time.Tick(dur) // every 1/10th of a second
for req := range requests {
<- chRate // rate limit our Service.Method RPC calls
go client.Call("Service.Method", req, ...)
}
// func After(d Duration) <-chan Time
func main() {
tick := time.Tick(1e8)
boom := time.After(5e8)
for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
return
default:
fmt.Println(" .")
time.Sleep(5e7)
}
}
}
using recover with goroutines
func server(workChan <-chan *Work) {
for work := range workChan {
go safelyDo(work)
}
}
func safelyDo(work *Work) {
defer func() {
if err := recover(); err != nil {
log.Printf("work failed with %s in %v:", err, work)
}
}()
do(work)
}
Tasks and Worker Processes
type Pool struct {
Mu sync.Mutex
Tasks []Task
}
func Worker(pool *Pool) {
for {
pool.Mu.Lock()
// begin critical section:
task := pool.Tasks[0] // take the first task
pool.Tasks = pool.Tasks[1:] // update the pool
// end critical section
pool.Mu.Unlock()
process(task)
}
}
func main() {
pending, done := make(chan *Task), make(chan *Task)
go sendWork(pending) // put tasks with work
for i := 0; i < N; i++ { // start N goroutines to do
go Worker(pending, done)
}
consumeWork(done)
}
func Worker(in, out chan *Task) {
for {
t := <-in
process(t)
out <- t
}
}
rule - use locking (mutexes) when: caching information in a shared data structure; holding state information;
rule - use channels when: communicating asynchronous results; distributing units of work; passing ownership of data;
lazy generator
var resume chan int
func integers() chan int {
yield := make (chan int)
count := 0
go func () {
for {
yield <- count
count++
}
} ()
return yield
}
func generateInteger() int {
return <-resume
}
func main() {
resume = integers()
fmt.Println(generateInteger()) //=> 0
fmt.Println(generateInteger()) //=> 1
}
Benchmarking goroutines
func main() {
fmt.Println("sync", testing.Benchmark(BenchmarkChannelSync).String())
fmt.Println("buffered",
testing.Benchmark(BenchmarkChannelBuffered).String())
}
func BenchmarkChannelSync(b *testing.B) {
ch := make(chan int)
go func() {
for i := 0; i < b.N; i++ {
ch <- i
}
close(ch)
}()
for _ = range ch {
}
}
func BenchmarkChannelBuffered(b *testing.B) {
ch := make(chan int, 128)
go func() {
for i := 0; i < b.N; i++ {
ch <- i
}
close(ch)
}()
for _ = range ch {
}
}
// Output:
// Windows: N Time 1 op Operations per sec
// sync 1000000 2443 ns/op --> 409 332 / s
// buffered 1000000 4850 ns/op --> 810 477 / s
implement a mutex
/* mutexes */
func (s semaphore) Lock() {
s.P(1)
}
func (s semaphore) Unlock() {
s.V(1)
}
/* signal-wait */
func (s semaphore) Wait(n int) {
s.P(n)
}
func (s semaphore) Signal() {
s.V(1)
}