Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Update queue.go #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
tangseng wants to merge 1 commit into huichen:master
base: master
Choose a base branch
Loading
from tangseng:master
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions queue.go
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type Queue struct {
runnerChannel chan []Task
isInitialized bool
numTasks uint64
closed bool
timeOut int64
}

type InitOptions struct {
Expand All @@ -29,6 +31,9 @@ type InitOptions struct {

// 批处理最大的任务数目。
NumTasksPerBatch int

//超时时间,0为不设超时
TimeOut int64
}

// 初始化任务队列,并开始计时。
Expand All @@ -45,6 +50,7 @@ func (q *Queue) Init(options InitOptions) {
} else {
q.numTasksPerBatch = options.NumTasksPerBatch
}
q.timeOut = options.TimeOut

runtime.GOMAXPROCS(runtime.NumCPU())
if options.NumWorkers <= 0 {
Expand All @@ -64,6 +70,9 @@ func (q *Queue) AddTask(delay uint64, timeout uint64, task Task) {
if !q.isInitialized {
log.Fatal("必须先初始化batchqueue")
}
if q.closed {
return
}
runner := new(Runner)
runner.task = task
runner.time = q.Now() + delay
Expand All @@ -80,6 +89,9 @@ func (q *Queue) RemoveTasks(task Task) {
if !q.isInitialized {
log.Fatal("必须先初始化batchqueue")
}
if q.closed {
return
}

q.Lock()
current := q.taskList.head
Expand Down Expand Up @@ -140,6 +152,12 @@ func (q *Queue) start() {
for {
q.Lock()
if q.taskList.head == nil {
q.Unlock()
if q.closed {
close(q.runnerChannel)
break
}
time.Sleep(time.Millisecond * 10)
continue
}
tasks := make([]Task, q.numTasksPerBatch)
Expand Down Expand Up @@ -202,9 +220,28 @@ func (q *Queue) start() {

func (q *Queue) worker() {
for {
tasks := <-q.runnerChannel
tasks, isclose := <-q.runnerChannel
if !isclose && q.closed {
break
}
if len(tasks) > 0 {
tasks[0].BatchRun(q, tasks)
if q.timeOut > 0 {
insert := make(chan bool)
go func(){
tasks[0].BatchRun(q, tasks)
insert <- true
}()
select {
case <-time.After(time.Duration(q.timeOut)):
case <-insert:
}
} else {
tasks[0].BatchRun(q, tasks)
}
}
}
}

func (q *Queue) Close() {
q.closed = true
}

AltStyle によって変換されたページ (->オリジナル) /