Skip to content

Commit ff54f2d

Browse files
committed
feat: Add system job scheduler with pluggable trigger and overlap policies
Introduces a new internal/scheduler package that replaces the ad-hoc inventory sync and leak detection goroutines with a structured scheduling framework. - The scheduler entries are reggistered with the scheduler when the RLA service is started. - Each entry is defined with a Trigger (Timer, Cron, Trigger-once, Events-trigged), a Policy and a Job. - The scheduler has 3 components to handle each registered entries: - Emitter: emits events based on the Trigger definition - Relay: handles the events from the Emitter and delivers the workers based on the Policy definition. - Worker: run the job based on the Job definition. - Provides graceful and forceful shutdown for the scheduler - The architecture is described in docs/scheduler-architecture.md Also refactor the service start codes to make sure the configuration file is loaded only once. Signed-off-by: Jin Wang <jinwan@nvidia.com>
1 parent 16e2f81 commit ff54f2d

21 files changed

Lines changed: 2499 additions & 146 deletions

rla/cmd/serve.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131

3232
cdb "github.com/NVIDIA/ncx-infra-controller-rest/db/pkg/db"
3333
"github.com/NVIDIA/ncx-infra-controller-rest/rla/internal/carbideapi"
34+
"github.com/NVIDIA/ncx-infra-controller-rest/rla/internal/config"
3435
svc "github.com/NVIDIA/ncx-infra-controller-rest/rla/internal/service"
3536
"github.com/NVIDIA/ncx-infra-controller-rest/rla/internal/task/componentmanager"
3637
computecarbide "github.com/NVIDIA/ncx-infra-controller-rest/rla/internal/task/componentmanager/compute/carbide"
@@ -215,6 +216,8 @@ func loadComponentManagerConfig() (componentmanager.Config, error) {
215216
// configuration, initialises provider and component manager registries, builds
216217
// the service, and blocks until a termination signal is received.
217218
func doServe() {
219+
rlaConfig := config.ReadConfig()
220+
218221
dbConf, err := cdb.ConfigFromEnv()
219222
if err != nil {
220223
log.Fatal().Msgf("failed to retrieve DB conn information: %v", err)
@@ -286,10 +289,12 @@ func doServe() {
286289
service, err := svc.New(
287290
ctx,
288291
svc.Config{
289-
Port: port,
290-
DBConf: dbConf,
291-
ExecutorConf: &temporalManagerConf,
292-
CMConfig: cmConfig,
292+
Port: port,
293+
DBConf: dbConf,
294+
ExecutorConf: &temporalManagerConf,
295+
RLAConfig: rlaConfig,
296+
CMConfig: cmConfig,
297+
ProviderRegistry: providerRegistry,
293298
CertConfig: pkgcerts.Config{
294299
CACert: globalCACert,
295300
TLSCert: globalTLSCert,

rla/docs/scheduler-architecture.md

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
# Scheduler Architecture
2+
3+
Internal job scheduling framework for RLA. Handles time-based, startup, and
4+
event-driven jobs with configurable overlap policies.
5+
6+
---
7+
8+
## Package Structure
9+
10+
```text
11+
internal/scheduler/
12+
scheduler.go — Scheduler: Schedule, Start, Stop
13+
entry.go — entry (job+trigger+policy+channels), workItem
14+
relay.go — relay: g1 (intake) + g2 (dispatch) pipeline
15+
dispatcher.go — dispatcher interface + Skip/Queue/QueueAll/Replace
16+
worker.go — worker: executes jobs sequentially from workCh
17+
types/
18+
job.go — Job interface: Name(), Run(ctx, Event)
19+
event.go — EventType, Event{Type, Payload}
20+
policy.go — Policy: Skip, Queue, QueueAll, Replace
21+
trigger.go — Trigger interface + IntervalTrigger, CronTrigger,
22+
OnceTrigger, EventTrigger
23+
jobs/
24+
inventorysync/ — InventorySync job implementation
25+
leakdetection/ — LeakDetection job implementation
26+
```
27+
28+
---
29+
30+
## Component Overview
31+
32+
```mermaid
33+
graph TB
34+
subgraph Triggers["Trigger (types/trigger.go)"]
35+
IT["IntervalTrigger"]
36+
CT["CronTrigger"]
37+
OT["OnceTrigger"]
38+
ET["EventTrigger"]
39+
end
40+
41+
subgraph Scheduler["Scheduler (scheduler.go)"]
42+
S["Scheduler\nSchedule / Start / Stop"]
43+
end
44+
45+
subgraph Entry["Per-Entry (entry.go)"]
46+
E["entry\njob + trigger + policy\neventCh · workCh"]
47+
end
48+
49+
subgraph RelayBox["Relay (relay.go)"]
50+
G1["g1 intake\nreads eventCh\nwrites queue\npings notifyCh"]
51+
Q["queue\n[]Event\n(max 2048)"]
52+
G2["g2 dispatch\nreads notifyCh\ndelegates to dispatcher"]
53+
G1 -- "append" --> Q
54+
G1 -- "ping / close" --> G2
55+
Q -- "dequeue" --> G2
56+
end
57+
58+
subgraph DispatcherBox["Dispatcher (dispatcher.go)"]
59+
D["Skip / Queue\nQueueAll / Replace"]
60+
end
61+
62+
subgraph WorkerBox["Worker (worker.go)"]
63+
W["worker\nsequential\njob.Run(ctx, ev)"]
64+
end
65+
66+
subgraph Jobs["Jobs"]
67+
IS["InventorySyncJob"]
68+
LD["LeakDetectionJob"]
69+
end
70+
71+
IT & CT & OT & ET --> S
72+
S --> E
73+
E -- "eventCh" --> G1
74+
G2 --> D
75+
D -- "workCh" --> W
76+
W --> IS
77+
W --> LD
78+
```
79+
80+
---
81+
82+
## Per-Entry Pipeline
83+
84+
Each registered job runs in its own isolated pipeline of three goroutines:
85+
86+
```mermaid
87+
flowchart LR
88+
TR["Trigger\n.Emit()"]
89+
EC["eventCh\ncap=1"]
90+
G1["g1 intake"]
91+
Q["queue\nmax 2048"]
92+
NC["notifyCh\ncap=1"]
93+
G2["g2 dispatch"]
94+
WC["workCh\nunbuffered"]
95+
W["worker"]
96+
97+
TR -->|"Event"| EC
98+
EC -->|"read"| G1
99+
G1 -->|"append"| Q
100+
G1 -->|"ping / close"| NC
101+
NC -->|"wake"| G2
102+
Q -->|"dequeue"| G2
103+
G2 -->|"workItem{ctx,ev}"| WC
104+
WC -->|"read"| W
105+
```
106+
107+
**g1 — intake:** Reads `eventCh`, buffers into `queue` (drops oldest on
108+
overflow), and non-blocking pings `notifyCh`. On exit, closes `notifyCh` to
109+
signal g2 that no more events will ever arrive.
110+
111+
**g2 — dispatch:** Wakes on `notifyCh`, delegates dequeue and send logic to
112+
the per-policy dispatcher. Exits on `notifyCh` close, `forceCh` close, or
113+
`ctx.Done()`.
114+
115+
**worker:** Reads `workItem` values from `workCh` sequentially, calls
116+
`job.Run(ctx, ev)`. Exits when `workCh` is closed by the relay.
117+
118+
---
119+
120+
## Dispatcher Behaviours
121+
122+
```mermaid
123+
flowchart TD
124+
N(["notifyCh ping"])
125+
DQ["Dequeue from queue"]
126+
N --> DQ
127+
128+
DQ --> SK{"Skip"}
129+
SK -->|"worker free"| SEND1["send workItem\nregister cancelCurrent"]
130+
SK -->|"worker busy"| DROP1["drop event"]
131+
132+
DQ --> QU{"Queue"}
133+
QU -->|"worker free"| SEND2["send workItem\nregister cancelCurrent"]
134+
QU -->|"worker busy\nno newer event"| PUT["put event back\nwait for next ping"]
135+
QU -->|"worker busy\nnewer event exists"| DROP2["drop event"]
136+
137+
DQ --> QA{"QueueAll"}
138+
QA --> SNAP["snapshot entire queue\n(single lock)"]
139+
SNAP --> FIFO["blocking send\nFIFO order"]
140+
141+
DQ --> RP{"Replace"}
142+
RP --> CANCEL["cancel running job"]
143+
CANCEL --> LATEST["take latest event\ndrop all others"]
144+
LATEST --> SEND3["blocking send\nregister cancelCurrent"]
145+
```
146+
147+
All dispatchers embed `dispatchBase` which holds `cancelCurrent`
148+
(`context.CancelFunc`), allowing `forceStop` to abort the in-flight job
149+
regardless of policy. `cancelCurrent` is registered only after a successful
150+
send to `workCh`.
151+
152+
---
153+
154+
## Lifecycle
155+
156+
`Scheduler` is a **single-use object**. The expected call order is:
157+
158+
1. `Schedule(...)` — register jobs (one or more calls)
159+
2. `Start(ctx)` — launch goroutines; returns an error if called more than once
160+
3. `Stop(force)` — shut down and wait; returns an error if called more than once
161+
162+
Calling `Start` after `Stop` is rejected with an error. Reuse is not supported
163+
because internal channels (`eventCh`, `workCh`, `notifyCh`) are closed during
164+
shutdown and cannot be safely re-opened. Create a new `Scheduler` instance
165+
instead.
166+
167+
---
168+
169+
## Shutdown Sequences
170+
171+
### Graceful — `Stop(force=false)`
172+
173+
```mermaid
174+
sequenceDiagram
175+
participant SC as Scheduler
176+
participant G1 as g1 intake
177+
participant G2 as g2 dispatch
178+
participant W as worker
179+
180+
SC->>SC: runCancel()
181+
G1->>G1: ctx.Done() → drain remaining eventCh items into relay queue
182+
Note over G1: enqueue pending events, ping notifyCh as needed
183+
G1->>G2: close(notifyCh)
184+
Note over G2: Skip/Queue/Replace: return immediately
185+
Note over G2: QueueAll: flush remaining events first
186+
G2->>W: close(workCh)
187+
W->>W: range workCh exits
188+
SC->>SC: wg.Wait() returns
189+
```
190+
191+
### Force — `Stop(force=true)`
192+
193+
```mermaid
194+
sequenceDiagram
195+
participant SC as Scheduler
196+
participant RL as relay
197+
participant G2 as g2 dispatch
198+
participant W as worker
199+
200+
SC->>RL: forceStop()
201+
RL->>RL: clear queue
202+
RL->>RL: dispatcher.cancel() — abort running job
203+
RL->>G2: close(forceCh)
204+
G2->>G2: exit immediately (no drain)
205+
SC->>SC: runCancel()
206+
RL->>W: close(workCh)
207+
W->>W: range workCh exits
208+
SC->>SC: wg.Wait() returns
209+
```
210+
211+
---
212+
213+
## Trigger Types
214+
215+
| Trigger | Fires | Exhausted when |
216+
|---------|-------|----------------|
217+
| `IntervalTrigger` | Every fixed duration | ctx cancelled |
218+
| `CronTrigger` | On robfig/cron v1 6-field schedule | ctx cancelled |
219+
| `OnceTrigger` | Exactly once, immediately | After first event sent |
220+
| `EventTrigger` | On each event from an external `<-chan Event` | Source channel closed or ctx cancelled |
221+
222+
---
223+
224+
## Overlap Policy Summary
225+
226+
| Policy | Worker busy behaviour | Event ordering | Use when |
227+
|--------|-----------------------|----------------|----------|
228+
| `Skip` | Drop incoming event | N/A | Job is idempotent; only one concurrent run matters |
229+
| `Queue` | Keep latest, discard rest | Latest only | Polling jobs: next run reads current state anyway |
230+
| `QueueAll` | Buffer all, process FIFO | Strict FIFO | Each event carries unique data; best-effort — oldest events dropped on queue overflow, queue skipped on force-stop |
231+
| `Replace` | Cancel current, run latest | Latest only | Only the most recent trigger is meaningful |
232+
233+
---
234+
235+
## Job Interface
236+
237+
```go
238+
type Job interface {
239+
Name() string
240+
Run(ctx context.Context, ev Event) error
241+
}
242+
```
243+
244+
The `Event` is passed directly to `Run` — no context value extraction needed.
245+
For time-based triggers (`IntervalTrigger`, `CronTrigger`, `OnceTrigger`),
246+
`ev` is a zero-value `Event{}`. For `EventTrigger`, `ev` carries the
247+
`Type` and `Payload` from the source channel.
248+
249+
---
250+
251+
## Key Design Decisions
252+
253+
- **Isolated pipeline per entry** — no shared state between jobs; one misbehaving
254+
trigger cannot block others.
255+
- **g1/g2 split** — g1 owns the queue under a mutex; g2 reads from it. Separating
256+
intake from dispatch keeps the relay lock out of the blocking send to `workCh`.
257+
- **notifyCh closure as termination signal** — g1 closing `notifyCh` is the
258+
natural "no more events" signal, eliminating a separate `intakeDone` channel.
259+
- **forceCh for immediate exit** — a dedicated closed channel lets all dispatchers
260+
(including QueueAll's drain loop) exit without waiting for ctx cancellation.
261+
- **cancelCurrent registered after send** — ensures `forceStop` always targets
262+
an actual running job, never a context that was never delivered.
263+
- **Event passed directly to Run** — avoids hidden coupling through context values;
264+
the compiler enforces the contract.

0 commit comments

Comments
 (0)