Skip to content

Commit fae6595

Browse files
authored
Feature/task redis server v7 (#13)
* Add StopWithTimeout method. * Upgrade task redis lib dependency: from v8 to v9.
1 parent 096b4c8 commit fae6595

File tree

3 files changed

+40
-2
lines changed

3 files changed

+40
-2
lines changed

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ require (
1616
github.com/go-playground/validator/v10 v10.9.0
1717
github.com/go-redis/redis/v8 v8.4.3
1818
github.com/panjf2000/ants/v2 v2.4.3
19+
github.com/redis/go-redis/v9 v9.3.0
1920
github.com/relvacode/iso8601 v1.1.0
2021
github.com/spf13/pflag v1.0.3
2122
github.com/stretchr/testify v1.7.0

go.sum

+8-1
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,13 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
6060
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
6161
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
6262
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
63-
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
63+
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
64+
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
65+
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
66+
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
6467
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
68+
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
69+
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
6570
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
6671
github.com/danieljoos/wincred v1.0.2 h1:zf4bhty2iLuwgjgpraD2E9UbvO+fe54XXGJbOwe23fU=
6772
github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3EJbmjhLdK9U=
@@ -223,6 +228,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
223228
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
224229
github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8=
225230
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
231+
github.com/redis/go-redis/v9 v9.3.0 h1:RiVDjmig62jIWp7Kk4XVLs0hzV6pI3PyTnnL0cnn0u0=
232+
github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
226233
github.com/relvacode/iso8601 v1.1.0 h1:2nV8sp0eOjpoKQ2vD3xSDygsjAx37NHG2UlZiCkDH4I=
227234
github.com/relvacode/iso8601 v1.1.0/go.mod h1:FlNp+jz+TXpyRqgmM7tnzHHzBnz776kmAH2h3sZCn0I=
228235
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=

task/task.go

+31-1
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import (
2121
"sync/atomic"
2222
"time"
2323

24-
"github.com/go-redis/redis/v8"
2524
"github.com/panjf2000/ants/v2"
25+
"github.com/redis/go-redis/v9"
2626
)
2727

2828
const (
@@ -194,6 +194,36 @@ func (scheduler *Scheduler) Stop(force bool) {
194194
}
195195
}
196196

197+
// StopWithTimeout stops the current scheduler, waiting running tasks at most `timeout` duration.
198+
func (scheduler *Scheduler) StopWithTimeout(timeout time.Duration) {
199+
if !atomic.CompareAndSwapInt32(&scheduler.started, 1, 0) {
200+
return
201+
}
202+
scheduler.stop <- true
203+
if timeout <= 0 {
204+
scheduler.workerPool.Release()
205+
return
206+
}
207+
timer := time.NewTimer(timeout)
208+
defer timer.Stop()
209+
for {
210+
select {
211+
case <-timer.C:
212+
scheduler.workerPool.Release()
213+
return
214+
default:
215+
runningCount := scheduler.runningJobCount()
216+
if runningCount > 0 {
217+
log.Printf("waiting %d jobs to finish...\n", runningCount)
218+
} else {
219+
scheduler.workerPool.Release()
220+
return
221+
}
222+
time.Sleep(10 * time.Millisecond)
223+
}
224+
}
225+
}
226+
197227
// JobStats returns all jobs' statistics in the current scheduler.
198228
func (scheduler *Scheduler) JobStats() map[string][]JobStat {
199229
jobStats := make(map[string][]JobStat, len(scheduler.jobs))

0 commit comments

Comments
 (0)