Skip to content

Commit 432c3cf

Browse files
committed
Use Agent API for locking mirrors and known_hosts
1 parent f19bb4b commit 432c3cf

File tree

9 files changed

+103
-37
lines changed

9 files changed

+103
-37
lines changed

agent/job_runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -732,7 +732,7 @@ func (w LogWriter) Write(bytes []byte) (int, error) {
732732
func (r *JobRunner) executePreBootstrapHook(ctx context.Context, hook string) (bool, error) {
733733
r.logger.Info("Running pre-bootstrap hook %q", hook)
734734

735-
sh, err := shell.New()
735+
sh, err := shell.New(ctx, shell.Config{})
736736
if err != nil {
737737
return false, err
738738
}

bootstrap/bootstrap.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,16 +70,20 @@ func New(conf Config) *Bootstrap {
7070
func (b *Bootstrap) Run(ctx context.Context) (exitCode int) {
7171
// Check if not nil to allow for tests to overwrite shell
7272
if b.shell == nil {
73-
var err error
74-
b.shell, err = shell.New()
73+
// The Agent API socket could be important for file locking.
74+
cfg := shell.Config{
75+
SocketsPath: b.Config.SocketsPath,
76+
}
77+
sh, err := shell.New(ctx, cfg)
7578
if err != nil {
7679
fmt.Printf("Error creating shell: %v", err)
7780
return 1
7881
}
7982

80-
b.shell.PTY = b.Config.RunInPty
81-
b.shell.Debug = b.Config.Debug
82-
b.shell.InterruptSignal = b.Config.CancelSignal
83+
sh.PTY = b.Config.RunInPty
84+
sh.Debug = b.Config.Debug
85+
sh.InterruptSignal = b.Config.CancelSignal
86+
b.shell = sh
8387
}
8488
if experiments.IsEnabled(experiments.KubernetesExec) {
8589
kubernetesClient := &kubernetes.Client{}

bootstrap/bootstrap_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func TestStartTracing_NoTracingBackend(t *testing.T) {
8686
b := New(Config{})
8787

8888
oriCtx := context.Background()
89-
b.shell, err = shell.New()
89+
b.shell, err = shell.New(oriCtx, shell.Config{})
9090
assert.NoError(t, err)
9191

9292
span, _, stopper := b.startTracing(oriCtx)
@@ -107,7 +107,7 @@ func TestStartTracing_Datadog(t *testing.T) {
107107
b := New(cfg)
108108

109109
oriCtx := context.Background()
110-
b.shell, err = shell.New()
110+
b.shell, err = shell.New(oriCtx, shell.Config{})
111111
assert.NoError(t, err)
112112

113113
span, ctx, stopper := b.startTracing(oriCtx)

bootstrap/shell/shell.go

Lines changed: 67 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ import (
2323
"github.com/opentracing/opentracing-go"
2424

2525
"github.com/buildkite/agent/v3/env"
26+
"github.com/buildkite/agent/v3/experiments"
2627
"github.com/buildkite/agent/v3/internal/shellscript"
28+
"github.com/buildkite/agent/v3/lock"
2729
"github.com/buildkite/agent/v3/logger"
2830
"github.com/buildkite/agent/v3/process"
2931
"github.com/buildkite/agent/v3/tracetools"
@@ -67,23 +69,45 @@ type Shell struct {
6769
cmd *command
6870
cmdLock sync.Mutex
6971

72+
// Lock service client, if available
73+
lockClient *lock.Client
74+
7075
// The signal to use to interrupt the command
7176
InterruptSignal process.Signal
7277
}
7378

74-
// New returns a new Shell
75-
func New() (*Shell, error) {
79+
// Config contains configuration options for the
80+
type Config struct {
81+
SocketsPath string
82+
}
83+
84+
// New returns a new Shell.
85+
func New(ctx context.Context, cfg Config) (*Shell, error) {
7686
wd, err := os.Getwd()
7787
if err != nil {
7888
return nil, fmt.Errorf("Failed to find current working directory: %w", err)
7989
}
8090

81-
return &Shell{
91+
sh := &Shell{
8292
Logger: StderrLogger,
8393
Env: env.FromSlice(os.Environ()),
8494
Writer: os.Stdout,
8595
wd: wd,
86-
}, nil
96+
}
97+
98+
// Use the Agent API for locking?
99+
if cfg.SocketsPath != "" && experiments.IsEnabled(experiments.AgentAPI) {
100+
ctx, canc := context.WithTimeout(ctx, 10*time.Second)
101+
defer canc()
102+
lc, err := lock.NewClient(ctx, cfg.SocketsPath)
103+
if err != nil {
104+
sh.Logger.Errorf("Couldn't use Agent API for locking, so falling back to using flock-based locks: %v", err)
105+
lc = nil
106+
}
107+
sh.lockClient = lc
108+
}
109+
110+
return sh, nil
87111
}
88112

89113
// WithStdin returns a copy of the Shell with the provided io.Reader set as the
@@ -181,8 +205,8 @@ func (s *Shell) WaitStatus() (process.WaitStatus, error) {
181205
return s.cmd.proc.WaitStatus(), nil
182206
}
183207

184-
// LockFile is a pid-based lock for cross-process locking
185-
type LockFile interface {
208+
// Unlocker types can unlock a cross-process lock (such as an flock).
209+
type Unlocker interface {
186210
Unlock() error
187211
}
188212

@@ -222,8 +246,44 @@ func (s *Shell) flock(ctx context.Context, path string, timeout time.Duration) (
222246
return lock, err
223247
}
224248

249+
// agentAPILock contains all the information required to unlock an Agent API
250+
// lock-service lock.
251+
type agentAPILock struct {
252+
client *lock.Client
253+
key, token string
254+
}
255+
256+
func (l *agentAPILock) Unlock() error {
257+
return l.client.Unlock(context.Background(), l.key, l.token)
258+
}
259+
260+
// lockWithAgentAPI acquires a lock in the Agent API lock service.
261+
func (s *Shell) lockWithAgentAPI(ctx context.Context, path string, timeout time.Duration) (*agentAPILock, error) {
262+
absolutePathToLock, err := filepath.Abs(path)
263+
if err != nil {
264+
return nil, fmt.Errorf("Failed to find absolute path to lock \"%s\" (%v)", path, err)
265+
}
266+
267+
ctx, cancel := context.WithTimeout(ctx, timeout)
268+
defer cancel()
269+
270+
token, err := s.lockClient.Lock(ctx, absolutePathToLock)
271+
if err != nil {
272+
return nil, fmt.Errorf("Failed to acquire lock for %q: %v", path, err)
273+
}
274+
275+
return &agentAPILock{
276+
client: s.lockClient,
277+
key: absolutePathToLock,
278+
token: token,
279+
}, err
280+
}
281+
225282
// Create a cross-process file-based lock based on pid files
226-
func (s *Shell) LockFile(ctx context.Context, path string, timeout time.Duration) (LockFile, error) {
283+
func (s *Shell) LockFile(ctx context.Context, path string, timeout time.Duration) (Unlocker, error) {
284+
if s.lockClient != nil {
285+
return s.lockWithAgentAPI(ctx, path, timeout)
286+
}
227287
return s.flock(ctx, path, timeout)
228288
}
229289

bootstrap/shell/shell_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func TestContextCancelTerminates(t *testing.T) {
131131
ctx, cancel := context.WithCancel(context.Background())
132132
defer cancel()
133133

134-
sh, err := shell.New()
134+
sh, err := shell.New(ctx, shell.Config{})
135135
if err != nil {
136136
t.Fatalf("shell.New() error = %v", err)
137137
}
@@ -165,7 +165,7 @@ func TestInterrupt(t *testing.T) {
165165
ctx, cancel := context.WithCancel(context.Background())
166166
defer cancel()
167167

168-
sh, err := shell.New()
168+
sh, err := shell.New(ctx, shell.Config{})
169169
if err != nil {
170170
t.Fatalf("shell.New() error = %v", err)
171171
}
@@ -190,7 +190,7 @@ func TestInterrupt(t *testing.T) {
190190
}
191191

192192
func TestDefaultWorkingDirFromSystem(t *testing.T) {
193-
sh, err := shell.New()
193+
sh, err := shell.New(context.Background(), shell.Config{})
194194
if err != nil {
195195
t.Fatalf("shell.New() error = %v", err)
196196
}
@@ -231,7 +231,7 @@ func TestWorkingDir(t *testing.T) {
231231
t.Fatalf("os.Getwd() error = %v", err)
232232
}
233233

234-
sh, err := shell.New()
234+
sh, err := shell.New(context.Background(), shell.Config{})
235235
if err != nil {
236236
t.Fatalf("shell.New() error = %v", err)
237237
}
@@ -353,7 +353,7 @@ func TestAcquiringLockHelperProcess(t *testing.T) {
353353
}
354354

355355
func newShellForTest(t *testing.T) *shell.Shell {
356-
sh, err := shell.New()
356+
sh, err := shell.New(context.Background(), shell.Config{})
357357
if err != nil {
358358
t.Fatalf("shell.New() error = %v", err)
359359
}
@@ -362,7 +362,7 @@ func newShellForTest(t *testing.T) *shell.Shell {
362362
}
363363

364364
func TestRunWithoutPrompt(t *testing.T) {
365-
sh, err := shell.New()
365+
sh, err := shell.New(context.Background(), shell.Config{})
366366
if err != nil {
367367
t.Fatalf("shell.New() error = %v", err)
368368
}

bootstrap/shell/test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package shell
22

33
import (
4+
"context"
45
"io"
56
"os"
67
"runtime"
@@ -11,7 +12,7 @@ import (
1112

1213
// NewTestShell creates a minimal shell suitable for tests.
1314
func NewTestShell(t *testing.T) *Shell {
14-
sh, err := New()
15+
sh, err := New(context.Background(), Config{})
1516
if err != nil {
1617
t.Fatalf("shell.New() error = %v", err)
1718
}

bootstrap/ssh_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func init() {
1818
func TestFindingSSHTools(t *testing.T) {
1919
t.Parallel()
2020

21-
sh, err := shell.New()
21+
sh, err := shell.New(context.Background(), shell.Config{})
2222
if err != nil {
2323
t.Fatalf("shell.New() error = %v", err)
2424
}

clicommand/agent_start.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -963,10 +963,10 @@ var AgentStartCommand = cli.Command{
963963
pool := agent.NewAgentPool(workers)
964964

965965
// Agent-wide shutdown hook. Once per agent, for all workers on the agent.
966-
defer agentShutdownHook(l, cfg)
966+
defer agentShutdownHook(ctx, l, cfg)
967967

968968
// Once the shutdown hook has been setup, trigger the startup hook.
969-
if err := agentStartupHook(l, cfg); err != nil {
969+
if err := agentStartupHook(ctx, l, cfg); err != nil {
970970
l.Fatal("%s", err)
971971
}
972972

@@ -1052,17 +1052,17 @@ func handlePoolSignals(ctx context.Context, l logger.Logger, pool *agent.AgentPo
10521052
return signals
10531053
}
10541054

1055-
func agentStartupHook(log logger.Logger, cfg AgentStartConfig) error {
1056-
return agentLifecycleHook("agent-startup", log, cfg)
1055+
func agentStartupHook(ctx context.Context, log logger.Logger, cfg AgentStartConfig) error {
1056+
return agentLifecycleHook(ctx, "agent-startup", log, cfg)
10571057
}
1058-
func agentShutdownHook(log logger.Logger, cfg AgentStartConfig) {
1059-
_ = agentLifecycleHook("agent-shutdown", log, cfg)
1058+
func agentShutdownHook(ctx context.Context, log logger.Logger, cfg AgentStartConfig) {
1059+
_ = agentLifecycleHook(ctx, "agent-shutdown", log, cfg)
10601060
}
10611061

10621062
// agentLifecycleHook looks for a hook script in the hooks path
10631063
// and executes it if found. Output (stdout + stderr) is streamed into the main
10641064
// agent logger. Exit status failure is logged and returned for the caller to handle
1065-
func agentLifecycleHook(hookName string, log logger.Logger, cfg AgentStartConfig) error {
1065+
func agentLifecycleHook(ctx context.Context, hookName string, log logger.Logger, cfg AgentStartConfig) error {
10661066
// search for hook (including .bat & .ps1 files on Windows)
10671067
p, err := hook.Find(cfg.HooksPath, hookName)
10681068
if err != nil {
@@ -1072,7 +1072,7 @@ func agentLifecycleHook(hookName string, log logger.Logger, cfg AgentStartConfig
10721072
}
10731073
return nil
10741074
}
1075-
sh, err := shell.New()
1075+
sh, err := shell.New(ctx, shell.Config{})
10761076
if err != nil {
10771077
log.Error("creating shell for %q hook: %v", hookName, err)
10781078
return err

clicommand/agent_start_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package clicommand
22

33
import (
4+
"context"
45
"os"
56
"path/filepath"
67
"runtime"
@@ -50,7 +51,7 @@ func TestAgentStartupHook(t *testing.T) {
5051
defer closer()
5152
filepath := writeAgentHook(t, hooksPath, "agent-startup")
5253
log := logger.NewBuffer()
53-
err := agentStartupHook(log, cfg(hooksPath))
54+
err := agentStartupHook(context.Background(), log, cfg(hooksPath))
5455

5556
if assert.NoError(t, err, log.Messages) {
5657
assert.Equal(t, []string{
@@ -64,14 +65,14 @@ func TestAgentStartupHook(t *testing.T) {
6465
defer closer()
6566

6667
log := logger.NewBuffer()
67-
err := agentStartupHook(log, cfg(hooksPath))
68+
err := agentStartupHook(context.Background(), log, cfg(hooksPath))
6869
if assert.NoError(t, err, log.Messages) {
6970
assert.Equal(t, []string{}, log.Messages)
7071
}
7172
})
7273
t.Run("with bad hooks path", func(t *testing.T) {
7374
log := logger.NewBuffer()
74-
err := agentStartupHook(log, cfg("zxczxczxc"))
75+
err := agentStartupHook(context.Background(), log, cfg("zxczxczxc"))
7576

7677
if assert.NoError(t, err, log.Messages) {
7778
assert.Equal(t, []string{}, log.Messages)
@@ -95,7 +96,7 @@ func TestAgentShutdownHook(t *testing.T) {
9596
defer closer()
9697
filepath := writeAgentHook(t, hooksPath, "agent-shutdown")
9798
log := logger.NewBuffer()
98-
agentShutdownHook(log, cfg(hooksPath))
99+
agentShutdownHook(context.Background(), log, cfg(hooksPath))
99100

100101
assert.Equal(t, []string{
101102
"[info] " + prompt + " " + filepath, // prompt
@@ -107,12 +108,12 @@ func TestAgentShutdownHook(t *testing.T) {
107108
defer closer()
108109

109110
log := logger.NewBuffer()
110-
agentShutdownHook(log, cfg(hooksPath))
111+
agentShutdownHook(context.Background(), log, cfg(hooksPath))
111112
assert.Equal(t, []string{}, log.Messages)
112113
})
113114
t.Run("with bad hooks path", func(t *testing.T) {
114115
log := logger.NewBuffer()
115-
agentShutdownHook(log, cfg("zxczxczxc"))
116+
agentShutdownHook(context.Background(), log, cfg("zxczxczxc"))
116117
assert.Equal(t, []string{}, log.Messages)
117118
})
118119
}

0 commit comments

Comments
 (0)