@@ -3,7 +3,9 @@ use crate::task::{run_task, Status, Task, TaskCmd, TaskResponse};
3
3
use chrono:: prelude:: * ;
4
4
use chrono:: Utc ;
5
5
use futures:: future:: join_all;
6
+ use futures:: StreamExt ;
6
7
use log:: { debug, error, info} ;
8
+ use std:: sync:: { Arc , Mutex } ;
7
9
use std:: time:: Duration ;
8
10
use tokio:: sync:: { mpsc, oneshot} ;
9
11
use tokio:: task:: JoinHandle ;
@@ -160,28 +162,27 @@ where
160
162
let mut receivers: Vec < oneshot:: Receiver < TaskResponse > > = Vec :: new ( ) ;
161
163
162
164
for handle in & self . handles {
163
- let ( send, recv) = oneshot:: channel ( ) ;
164
- let msg = TaskCmd :: Run { sender : send } ;
165
- let _ = handle. sender . send ( msg) . await ;
165
+ let ( sender, recv) = oneshot:: channel ( ) ;
166
+ let _ = handle. sender . send ( TaskCmd :: Run { sender } ) . await ;
166
167
receivers. push ( recv) ;
167
168
}
168
169
169
- let mut err_no: usize = 0 ;
170
- let mut total_runs: usize = 0 ;
171
- for recv in receivers {
172
- let task_response = recv . await ;
173
- let task_response = task_response . unwrap ( ) ;
174
- match task_response . status {
175
- Status :: Executed => {
176
- total_runs += 1 ;
177
- }
178
- Status :: Failed => {
179
- err_no += 1 ;
180
- total_runs += 1 ;
181
- }
182
- _ => { }
183
- }
184
- }
170
+ let err_no: Arc < Mutex < usize > > = Arc :: new ( Mutex :: new ( 0usize ) ) ;
171
+ let total_runs: Arc < Mutex < usize > > = Arc :: new ( Mutex :: new ( 0usize ) ) ;
172
+ futures :: stream :: iter ( receivers)
173
+ . for_each ( |r| async {
174
+ match r . await . unwrap ( ) . status {
175
+ Status :: Executed => {
176
+ * total_runs . lock ( ) . unwrap ( ) += 1 ;
177
+ }
178
+ Status :: Failed => {
179
+ * err_no . lock ( ) . unwrap ( ) += 1 ;
180
+ * total_runs . lock ( ) . unwrap ( ) += 1 ;
181
+ }
182
+ _ => { /* Do nothing */ }
183
+ } ;
184
+ } )
185
+ . await ;
185
186
186
187
// Send for reschedule
187
188
receivers = Vec :: new ( ) ;
@@ -193,26 +194,25 @@ where
193
194
}
194
195
195
196
for recv in receivers {
196
- let res = recv. await ;
197
- let res1 = res. unwrap ( ) ;
198
- if res1. status == Status :: Finished {
197
+ let res = recv. await . unwrap ( ) ;
198
+ if res. status == Status :: Finished {
199
199
for handle in & self . handles {
200
- if handle. id == res1 . id {
201
- debug ! ( "Killing task {} due to end of execution circle." , res1 . id) ;
200
+ if handle. id == res . id {
201
+ debug ! ( "Killing task {} due to end of execution circle." , res . id) ;
202
202
handle. handle . abort ( ) ;
203
203
}
204
204
}
205
- let index = self . handles . iter ( ) . position ( |x| x. id == res1 . id ) . unwrap ( ) ;
205
+ let index = self . handles . iter ( ) . position ( |x| x. id == res . id ) . unwrap ( ) ;
206
206
self . handles . remove ( index) ;
207
207
}
208
208
}
209
209
210
210
// Build the response
211
- if total_runs > 0 {
212
- if err_no == 0 {
213
- ExecutionStatus :: Success ( total_runs)
211
+ if * total_runs. lock ( ) . unwrap ( ) > 0 {
212
+ if * err_no. lock ( ) . unwrap ( ) == 0 {
213
+ ExecutionStatus :: Success ( * total_runs. lock ( ) . unwrap ( ) )
214
214
} else {
215
- ExecutionStatus :: HadError ( total_runs, err_no)
215
+ ExecutionStatus :: HadError ( * total_runs. lock ( ) . unwrap ( ) , * err_no. lock ( ) . unwrap ( ) )
216
216
}
217
217
} else {
218
218
ExecutionStatus :: NoExecution
@@ -227,9 +227,8 @@ where
227
227
// Send init signal to all the tasks that are not initialized yet.
228
228
for handle in & self . handles {
229
229
if !handle. is_init {
230
- let ( send, recv) = oneshot:: channel ( ) ;
231
- let msg = TaskCmd :: Init { sender : send } ;
232
- let _ = handle. sender . send ( msg) . await ;
230
+ let ( sender, recv) = oneshot:: channel ( ) ;
231
+ let _ = handle. sender . send ( TaskCmd :: Init { sender } ) . await ;
233
232
receivers. push ( recv) ;
234
233
count += 1 ;
235
234
}
@@ -321,6 +320,8 @@ where
321
320
#[ cfg( test) ]
322
321
mod test {
323
322
use super :: * ;
323
+ use crate :: task:: TaskStepStatusErr :: Error ;
324
+ use crate :: task:: TaskStepStatusOk :: Success ;
324
325
use crate :: TaskBuilder ;
325
326
use chrono:: Local ;
326
327
use std:: time:: Duration ;
@@ -351,9 +352,9 @@ mod test {
351
352
352
353
// Create a task.
353
354
let mut task = Task :: new ( "* * * * * * *" , None , Some ( 1 ) , Local ) ;
354
- task. add_step_default ( || Ok ( ( ) ) ) ;
355
+ task. add_step_default ( || Ok ( Success ) ) ;
355
356
// Return an error in the second step.
356
- task. add_step_default ( || Err ( ( ) ) ) ;
357
+ task. add_step_default ( || Err ( Error ( None ) ) ) ;
357
358
358
359
// Add a task.
359
360
scheduler. add_task ( task) ;
0 commit comments