-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcommand_processor.go
More file actions
143 lines (112 loc) · 3.05 KB
/
command_processor.go
File metadata and controls
143 lines (112 loc) · 3.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package main
import (
"time"
"github.com/apex/log"
)
const (
// DefaultPhysicsLoopInterval is the default interval for which physics
// computations will occur.
DefaultPhysicsLoopInterval = DefaultStateUpdateLoopInterval / 10.0
// DefaultCommandSizeBuffer is the default size of the buffer which holds
// commands.
DefaultCommandSizeBuffer = 200
)
// NewCommandProcessor creates a new CommandProcessor with all the defaults set
// up.
func NewCommandProcessor(gs *GameState) CommandProcessor {
return CommandProcessor{
Log: gs.Log.WithField("module", "CommandProcessor"),
Interval: DefaultPhysicsLoopInterval,
BufferSize: DefaultCommandSizeBuffer,
State: gs,
shutdown: make(chan struct{}),
quit: make(chan struct{}),
queue: make(chan Command, DefaultCommandSizeBuffer),
queueBuffer: make([]Command, 0, DefaultCommandSizeBuffer),
}
}
// CommandProcessor recieves the commands via the new command channel and sends
// them off to the game state for simulation and transmission.
type CommandProcessor struct {
Log log.Interface
Interval time.Duration
BufferSize int
State *GameState
shutdown chan struct{}
quit chan struct{}
queue chan Command
queueBuffer []Command
queueSize int
}
// Add inserts a single command directly into the buffer.
func (c *CommandProcessor) Add(command Command) {
c.Log.Debug("add")
// capture the command
c.queueBuffer = append(c.queueBuffer, command)
// increate the queue size
c.queueSize++
}
// Flush empties the command processors queue into the game state for simulation
// and then clears out the queue.
func (c *CommandProcessor) Flush() {
c.Log.Debug("flush")
// simulate the state
c.State.simulate <- c.queueBuffer
// flush out the buffer
c.queueBuffer = nil
// set the queue size to zero
c.queueSize = 0
}
// Loop processes incomming commands and handles them.
func (c *CommandProcessor) Loop() {
tick := time.NewTicker(c.Interval)
for {
select {
case command := <-c.queue:
c.Log.Debug("new command")
// add the command to the buffer
c.Add(command)
// if we are at the queue size
if c.queueSize >= c.BufferSize {
// flush out the queue
c.Flush()
}
case <-tick.C:
// if there is state to simulate...
if c.queueSize > 0 {
// then flush it!
c.Flush()
}
c.State.simulate <- nil
case <-c.quit:
c.Log.Debug("quit")
// stop the ticker
tick.Stop()
// eat through the rest of the channel if there is anything in it...
for command := range c.queue {
c.Add(command)
}
// if there is state to simulate...
if c.queueSize > 0 {
// then flush it!
c.Flush()
}
// send the shutdown signal
c.shutdown <- struct{}{}
// leave the Loop
return
}
}
}
// Queue adds the command to the command processors queue.
func (c *CommandProcessor) Queue(command Command) {
c.Log.Debug("queue")
c.queue <- command
}
// Close stops the processor's loop.
func (c *CommandProcessor) Close() {
c.Log.Debug("close")
c.quit <- struct{}{}
close(c.queue)
<-c.shutdown
}