Skip to content

Commit d8e2ee8

Browse files
committed
Fix pre-exec/pre-bootstrap hooks
1 parent 8913d7f commit d8e2ee8

File tree

1 file changed

+124
-111
lines changed

1 file changed

+124
-111
lines changed

agent/job_runner.go

Lines changed: 124 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -348,21 +348,24 @@ type hookExit struct {
348348

349349
func (r *JobRunner) preExecHook(ctx context.Context, hookName string) hookExit {
350350
exit := hookExit{Ok: true}
351-
if hookPath, _ := hook.Find(r.conf.AgentConfiguration.HooksPath, hookName); hookPath != "" {
352-
// Once we have a hook any failure to run it MUST be fatal to the job to guarantee a true
353-
// positive result from the hook
354-
okay, err := r.executePreExecHook(ctx, hookName, hookPath)
355-
if !okay {
356-
exit.Ok = false
357-
358-
// Ensure the Job UI knows why this job resulted in failure
359-
r.logStreamer.Process([]byte(fmt.Sprintf("%s hook rejected this job, see the buildkite-agent logs for more details", hookName)))
360-
// But disclose more information in the agent logs
361-
r.logger.Error("%s hook rejected this job: %s", hookName, err)
362-
363-
exit.ExitStatus = -1
364-
exit.SignalReason = "agent_refused"
365-
}
351+
hookPath, _ := hook.Find(r.conf.AgentConfiguration.HooksPath, hookName)
352+
if hookPath == "" {
353+
return exit
354+
}
355+
356+
// Once we have a hook any failure to run it MUST be fatal to the job to guarantee a true
357+
// positive result from the hook
358+
okay, err := r.executePreExecHook(ctx, hookName, hookPath)
359+
if !okay {
360+
exit.Ok = false
361+
362+
// Ensure the Job UI knows why this job resulted in failure
363+
r.logStreamer.Process([]byte(fmt.Sprintf("%s hook rejected this job, see the buildkite-agent logs for more details", hookName)))
364+
// But disclose more information in the agent logs
365+
r.logger.Error("%s hook rejected this job: %s", hookName, err)
366+
367+
exit.ExitStatus = -1
368+
exit.SignalReason = "agent_refused"
366369
}
367370

368371
return exit
@@ -403,120 +406,87 @@ func (r *JobRunner) Run(ctx context.Context) error {
403406
return err
404407
}
405408

409+
hookExit := r.preExecHooks(ctx)
406410
// Before executing the job process with the received Job env,
407411
// execute the pre-bootstrap/pre-exec hook (if present) for it to tell us
408412
// whether it is happy to proceed.
409-
hookExit := r.preExecHook(ctx, "pre-bootstrap")
410-
hookExit = r.preExecHook(ctx, "pre-exec")
411-
412413
// Default exit status is no exit status
413414
signal := ""
414415
exitStatus := strconv.Itoa(hookExit.ExitStatus)
415416
signalReason := hookExit.SignalReason
416417

418+
var wg sync.WaitGroup
417419
// Set up a child context for helper goroutines related to running the job.
418420
cctx, cancel := context.WithCancel(ctx)
419-
defer cancel()
420-
421-
var wg sync.WaitGroup
422-
if hookExit.Ok {
423-
// Kick off log streaming and job status checking when the process starts.
424-
wg.Add(2)
425-
go r.jobLogStreamer(cctx, &wg)
426-
go r.jobCancellationChecker(cctx, &wg)
427-
428-
// Run the process. This will block until it finishes.
429-
if err := r.process.Run(cctx); err != nil {
430-
// Send the error as output
431-
r.logStreamer.Process([]byte(err.Error()))
432-
433-
// The process did not run at all, so make sure it fails
434-
exitStatus = "-1"
435-
signalReason = "process_run_error"
436-
} else {
437-
// Intended to capture situations where the job-exec (aka bootstrap) container did not
438-
// start. Normally such errors are hidden in the Kubernetes events. Let's feed them up
439-
// to the user as they may be the caused by errors in the pipeline definition.
440-
if r.cancelled && !r.stopped {
441-
k8sProcess, ok := r.process.(*kubernetes.Runner)
442-
if ok && k8sProcess.ClientStateUnknown() {
443-
r.logStreamer.Process([]byte(
444-
"Some containers had unknown exit statuses. Perhaps they were in ImagePullBackOff.",
445-
))
446-
}
447-
}
421+
defer func() {
422+
// Finish the build in the Buildkite Agent API
423+
//
424+
// Once we tell the API we're finished it might assign us new work, so make
425+
// sure everything else is done first.
426+
r.finishJob(ctx, startedAt, cancel, &wg, exitStatus, signal, signalReason)
448427

449-
// Add the final output to the streamer
450-
r.logStreamer.Process(r.output.ReadAndTruncate())
428+
r.logger.Info("Finished job %s", r.job.ID)
429+
}()
451430

452-
// Collect the finished process' exit status
453-
exitStatus = fmt.Sprintf("%d", r.process.WaitStatus().ExitStatus())
454-
if ws := r.process.WaitStatus(); ws.Signaled() {
455-
signal = process.SignalString(ws.Signal())
456-
}
457-
if r.stopped {
458-
// The agent is being gracefully stopped, and we signaled the job to end. Often due
459-
// to pending host shutdown or EC2 spot instance termination
460-
signalReason = "agent_stop"
461-
} else if r.cancelled {
462-
// The job was signaled because it was cancelled via the buildkite web UI
463-
signalReason = "cancel"
464-
}
465-
}
431+
if !hookExit.Ok {
432+
return nil
466433
}
467434

468-
// Store the finished at time
469-
finishedAt := time.Now()
470-
471-
// Stop the header time streamer. This will block until all the chunks
472-
// have been uploaded
473-
r.headerTimesStreamer.Stop()
435+
// Kick off log streaming and job status checking when the process starts.
436+
wg.Add(2)
437+
go r.jobLogStreamer(cctx, &wg)
438+
go r.jobCancellationChecker(cctx, &wg)
474439

475-
// Stop the log streamer. This will block until all the chunks have
476-
// been uploaded
477-
r.logStreamer.Stop()
440+
// Run the process. This will block until it finishes.
441+
if err := r.process.Run(cctx); err != nil {
442+
// Send the error as output
443+
r.logStreamer.Process([]byte(err.Error()))
478444

479-
// Warn about failed chunks
480-
if count := r.logStreamer.FailedChunks(); count > 0 {
481-
r.logger.Warn("%d chunks failed to upload for this job", count)
482-
}
483-
484-
// Ensure the additional goroutines are stopped.
485-
cancel()
445+
// The process did not run at all, so make sure it fails
446+
exitStatus = "-1"
447+
signalReason = "process_run_error"
448+
} else {
449+
// Intended to capture situations where the job-exec (aka bootstrap) container did not
450+
// start. Normally such errors are hidden in the Kubernetes events. Let's feed them up
451+
// to the user as they may be the caused by errors in the pipeline definition.
452+
if r.cancelled && !r.stopped {
453+
k8sProcess, ok := r.process.(*kubernetes.Runner)
454+
if ok && k8sProcess.ClientStateUnknown() {
455+
r.logStreamer.Process([]byte(
456+
"Some containers had unknown exit statuses. Perhaps they were in ImagePullBackOff.",
457+
))
458+
}
459+
}
486460

487-
// Wait for the routines that we spun up to finish
488-
r.logger.Debug("[JobRunner] Waiting for all other routines to finish")
489-
wg.Wait()
461+
// Add the final output to the streamer
462+
r.logStreamer.Process(r.output.ReadAndTruncate())
490463

491-
// Remove the env file, if any
492-
if r.envFile != nil {
493-
if err := os.Remove(r.envFile.Name()); err != nil {
494-
r.logger.Warn("[JobRunner] Error cleaning up env file: %s", err)
464+
// Collect the finished process' exit status
465+
exitStatus = fmt.Sprintf("%d", r.process.WaitStatus().ExitStatus())
466+
if ws := r.process.WaitStatus(); ws.Signaled() {
467+
signal = process.SignalString(ws.Signal())
468+
}
469+
if r.stopped {
470+
// The agent is being gracefully stopped, and we signaled the job to end. Often due
471+
// to pending host shutdown or EC2 spot instance termination
472+
signalReason = "agent_stop"
473+
} else if r.cancelled {
474+
// The job was signaled because it was cancelled via the buildkite web UI
475+
signalReason = "cancel"
495476
}
496-
r.logger.Debug("[JobRunner] Deleted env file: %s", r.envFile.Name())
497477
}
498478

499-
// Write some metrics about the job run
500-
jobMetrics := r.metrics.With(metrics.Tags{
501-
"exit_code": exitStatus,
502-
})
503-
if exitStatus == "0" {
504-
jobMetrics.Timing("jobs.duration.success", finishedAt.Sub(startedAt))
505-
jobMetrics.Count("jobs.success", 1)
506-
} else {
507-
jobMetrics.Timing("jobs.duration.error", finishedAt.Sub(startedAt))
508-
jobMetrics.Count("jobs.failed", 1)
509-
}
479+
return nil
480+
}
510481

511-
// Finish the build in the Buildkite Agent API
512-
//
513-
// Once we tell the API we're finished it might assign us new work, so make
514-
// sure everything else is done first.
515-
r.finishJob(ctx, finishedAt, exitStatus, signal, signalReason, r.logStreamer.FailedChunks())
482+
func (r *JobRunner) preExecHooks(ctx context.Context) hookExit {
483+
hookExit := r.preExecHook(ctx, "pre-bootstrap")
484+
if !hookExit.Ok {
485+
return hookExit
486+
}
516487

517-
r.logger.Info("Finished job %s", r.job.ID)
488+
return r.preExecHook(ctx, "pre-exec")
518489

519-
return nil
520490
}
521491

522492
func (r *JobRunner) CancelAndStop() error {
@@ -748,11 +718,11 @@ func (r *JobRunner) executePreExecHook(ctx context.Context, hookName, hookPath s
748718
}
749719

750720
if err := sh.RunWithoutPrompt(ctx, hookPath); err != nil {
751-
r.logger.Error("Finished %s hook %q: hookName, job rejected", hookPath)
721+
r.logger.Error("Finished %s hook %q: job rejected", hookName, hookPath)
752722
return false, err
753723
}
754724

755-
r.logger.Info("Finished %s hook %q: hookName, job accepted", hookPath)
725+
r.logger.Info("Finished %s hook %q: job accepted", hookName, hookPath)
756726
return true, nil
757727
}
758728

@@ -786,18 +756,61 @@ func (r *JobRunner) startJob(ctx context.Context, startedAt time.Time) error {
786756

787757
// finishJob finishes the job in the Buildkite Agent API. If the FinishJob call
788758
// cannot return successfully, this will retry for a long time.
789-
func (r *JobRunner) finishJob(ctx context.Context, finishedAt time.Time, exitStatus, signal, signalReason string, failedChunkCount int) error {
759+
func (r *JobRunner) finishJob(ctx context.Context, startedAt time.Time, cancel func(), wg *sync.WaitGroup, exitStatus, signal, signalReason string) error {
760+
finishedAt := time.Now()
761+
762+
// Stop the header time streamer. This will block until all the chunks
763+
// have been uploaded
764+
r.headerTimesStreamer.Stop()
765+
766+
// Stop the log streamer. This will block until all the chunks have
767+
// been uploaded
768+
r.logStreamer.Stop()
769+
770+
// Warn about failed chunks
771+
if count := r.logStreamer.FailedChunks(); count > 0 {
772+
r.logger.Warn("%d chunks failed to upload for this job", count)
773+
}
774+
775+
// Ensure the additional goroutines are stopped.
776+
cancel()
777+
778+
// Wait for the routines that we spun up to finish
779+
r.logger.Debug("[JobRunner] Waiting for all other routines to finish")
780+
wg.Wait()
781+
782+
// Remove the env file, if any
783+
if r.envFile != nil {
784+
if err := os.Remove(r.envFile.Name()); err != nil {
785+
r.logger.Warn("[JobRunner] Error cleaning up env file: %s", err)
786+
}
787+
r.logger.Debug("[JobRunner] Deleted env file: %s", r.envFile.Name())
788+
}
789+
790+
// Write some metrics about the job run
791+
jobMetrics := r.metrics.With(metrics.Tags{
792+
"exit_code": exitStatus,
793+
})
794+
795+
if exitStatus == "0" {
796+
jobMetrics.Timing("jobs.duration.success", finishedAt.Sub(startedAt))
797+
jobMetrics.Count("jobs.success", 1)
798+
} else {
799+
jobMetrics.Timing("jobs.duration.error", finishedAt.Sub(startedAt))
800+
jobMetrics.Count("jobs.failed", 1)
801+
}
802+
790803
r.job.FinishedAt = finishedAt.UTC().Format(time.RFC3339Nano)
791804
r.job.ExitStatus = exitStatus
792805
r.job.Signal = signal
793806
r.job.SignalReason = signalReason
794-
r.job.ChunksFailedCount = failedChunkCount
807+
r.job.ChunksFailedCount = r.logStreamer.FailedChunks()
795808

796809
r.logger.Debug("[JobRunner] Finishing job with exit_status=%s, signal=%s and signal_reason=%s",
797810
r.job.ExitStatus, r.job.Signal, r.job.SignalReason)
798811

799-
ctx, cancel := context.WithTimeout(ctx, 48*time.Hour)
800-
defer cancel()
812+
ctx, ccancel := context.WithTimeout(ctx, 48*time.Hour)
813+
defer ccancel()
801814

802815
return roko.NewRetrier(
803816
roko.TryForever(),

0 commit comments

Comments
 (0)