Skip to content

Commit 45cbbca

Browse files
authored
more active logs (#2470)
* more active logs * address the memory time bomb * chunked logs * fix apply * refactor so we dedupe
1 parent 9a7d263 commit 45cbbca

File tree

17 files changed

+658
-349
lines changed

17 files changed

+658
-349
lines changed

sandbox-sidecar/src/jobs/jobRunner.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ export class JobRunner {
2323
logger.info({ job: job.id, runner: this.runner.name }, "sandbox job started");
2424

2525
try {
26-
const result = await this.runner.run(job);
26+
const result = await this.runner.run(job, (chunk) => this.store.appendLogs(job.id, chunk));
2727
this.store.updateStatus(job.id, "succeeded", result.logs);
2828
this.store.setResult(job.id, result.result);
2929
logger.info({ job: job.id }, "sandbox job succeeded");
@@ -36,4 +36,3 @@ export class JobRunner {
3636
}
3737
}
3838
}
39-

sandbox-sidecar/src/jobs/jobStore.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,17 @@ export class JobStore {
3939
job.updatedAt = new Date();
4040
}
4141

42+
appendLogs(id: string, chunk: string) {
43+
const job = this.jobs.get(id);
44+
if (!job || !chunk) return;
45+
job.logs = `${job.logs}${chunk}`;
46+
job.updatedAt = new Date();
47+
}
48+
4249
setResult(id: string, result: SandboxRunResult | undefined) {
4350
const job = this.jobs.get(id);
4451
if (!job) return;
4552
job.result = result;
4653
job.updatedAt = new Date();
4754
}
4855
}
49-

sandbox-sidecar/src/runners/e2bRunner.ts

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@ export class E2BSandboxRunner implements SandboxRunner {
2323
}
2424
}
2525

26-
async run(job: SandboxRunRecord): Promise<RunnerOutput> {
26+
async run(job: SandboxRunRecord, appendLog?: (chunk: string) => void): Promise<RunnerOutput> {
2727
if (job.payload.operation === "plan") {
28-
return this.runPlan(job);
28+
return this.runPlan(job, appendLog);
2929
}
30-
return this.runApply(job);
30+
return this.runApply(job, appendLog);
3131
}
3232

33-
private async runPlan(job: SandboxRunRecord): Promise<RunnerOutput> {
33+
private async runPlan(job: SandboxRunRecord, appendLog?: (chunk: string) => void): Promise<RunnerOutput> {
3434
const requestedVersion = job.payload.terraformVersion || "1.5.7";
3535
const requestedEngine = job.payload.engine || "terraform";
3636
const { sandbox, needsInstall } = await this.createSandbox(requestedVersion, requestedEngine);
@@ -42,28 +42,32 @@ export class E2BSandboxRunner implements SandboxRunner {
4242

4343
const workDir = await this.setupWorkspace(sandbox, job);
4444
const logs: string[] = [];
45+
const streamLog = (chunk: string) => {
46+
if (!chunk) return;
47+
appendLog?.(chunk);
48+
};
4549

4650
// Run terraform init
4751
await this.runTerraformCommand(
4852
sandbox,
4953
workDir,
5054
["init", "-input=false", "-no-color"],
5155
logs,
56+
streamLog,
5257
);
5358

5459
// Run terraform plan
5560
const planArgs = ["plan", "-input=false", "-no-color", "-out=tfplan.binary"];
5661
if (job.payload.isDestroy) {
5762
planArgs.splice(1, 0, "-destroy");
5863
}
59-
await this.runTerraformCommand(sandbox, workDir, planArgs, logs);
64+
await this.runTerraformCommand(sandbox, workDir, planArgs, logs, streamLog);
6065

6166
// Get plan JSON
6267
const showResult = await this.runTerraformCommand(
6368
sandbox,
6469
workDir,
6570
["show", "-json", "tfplan.binary"],
66-
logs,
6771
);
6872

6973
const planJSON = showResult.stdout;
@@ -76,31 +80,36 @@ export class E2BSandboxRunner implements SandboxRunner {
7680
planJSON: Buffer.from(planJSON, "utf8").toString("base64"),
7781
};
7882

79-
return { logs: logs.join("\n"), result };
83+
return { logs: logs.join(""), result };
8084
} finally {
8185
await sandbox.kill();
8286
}
8387
}
8488

85-
private async runApply(job: SandboxRunRecord): Promise<RunnerOutput> {
89+
private async runApply(job: SandboxRunRecord, appendLog?: (chunk: string) => void): Promise<RunnerOutput> {
8690
const requestedVersion = job.payload.terraformVersion || "1.5.7";
87-
const requestedEngine = job.payload.engine || "terraform";
88-
const { sandbox, needsInstall } = await this.createSandbox(requestedVersion, requestedEngine);
89-
try {
90-
// Install IaC tool if using fallback template
91-
if (needsInstall) {
92-
await this.installIacTool(sandbox, requestedEngine, requestedVersion);
93-
}
94-
95-
const workDir = await this.setupWorkspace(sandbox, job);
96-
const logs: string[] = [];
91+
const requestedEngine = job.payload.engine || "terraform";
92+
const { sandbox, needsInstall } = await this.createSandbox(requestedVersion, requestedEngine);
93+
try {
94+
// Install IaC tool if using fallback template
95+
if (needsInstall) {
96+
await this.installIacTool(sandbox, requestedEngine, requestedVersion);
97+
}
98+
99+
const workDir = await this.setupWorkspace(sandbox, job);
100+
const logs: string[] = [];
101+
const streamLog = (chunk: string) => {
102+
if (!chunk) return;
103+
appendLog?.(chunk);
104+
};
97105

98106
// Run terraform init
99107
await this.runTerraformCommand(
100108
sandbox,
101109
workDir,
102110
["init", "-input=false", "-no-color"],
103111
logs,
112+
streamLog,
104113
);
105114

106115
// Run terraform apply/destroy
@@ -110,6 +119,7 @@ export class E2BSandboxRunner implements SandboxRunner {
110119
workDir,
111120
[applyCommand, "-auto-approve", "-input=false", "-no-color"],
112121
logs,
122+
streamLog,
113123
);
114124

115125
// Read the state file
@@ -119,7 +129,7 @@ export class E2BSandboxRunner implements SandboxRunner {
119129
state: Buffer.from(stateContent, "utf8").toString("base64"),
120130
};
121131

122-
return { logs: logs.join("\n"), result };
132+
return { logs: logs.join(""), result };
123133
} finally {
124134
await sandbox.kill();
125135
}
@@ -262,28 +272,41 @@ export class E2BSandboxRunner implements SandboxRunner {
262272
cwd: string,
263273
args: string[],
264274
logBuffer?: string[],
275+
appendLog?: (chunk: string) => void,
265276
): Promise<{ stdout: string; stderr: string }> {
266277
const engine = (sandbox as any)._requestedEngine || "terraform";
267278
const binaryName = engine === "tofu" ? "tofu" : "terraform";
268279
const cmdStr = `${binaryName} ${args.join(" ")}`;
269280
logger.info({ cmd: cmdStr, cwd, engine }, "running IaC command in E2B sandbox");
270281

282+
let sawStream = false;
283+
const pipeChunk = (chunk: string | undefined) => {
284+
if (!chunk) return;
285+
sawStream = true;
286+
if (logBuffer) {
287+
logBuffer.push(chunk);
288+
}
289+
appendLog?.(chunk);
290+
};
291+
271292
const result = await sandbox.commands.run(cmdStr, {
272293
cwd,
273294
envs: {
274295
TF_IN_AUTOMATION: "1",
275296
},
297+
onStdout: pipeChunk,
298+
onStderr: pipeChunk,
276299
});
277300

278301
const stdout = result.stdout;
279302
const stderr = result.stderr;
280303
const exitCode = result.exitCode;
281304

305+
// Push any remaining buffered output for completeness in final log
282306
const mergedLogs = `${stdout}\n${stderr}`.trim();
283-
if (logBuffer && mergedLogs.length > 0) {
284-
logBuffer.push(mergedLogs);
307+
if (!sawStream && mergedLogs.length > 0) {
308+
pipeChunk(mergedLogs + "\n");
285309
}
286-
287310
if (exitCode !== 0) {
288311
throw new Error(
289312
`${binaryName} ${args[0]} exited with code ${exitCode}\n${mergedLogs}`,
@@ -330,4 +353,3 @@ export class E2BSandboxRunner implements SandboxRunner {
330353
}
331354
}
332355
}
333-

sandbox-sidecar/src/runners/types.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,5 @@ export interface RunnerOutput {
77

88
export interface SandboxRunner {
99
readonly name: string;
10-
run(job: SandboxRunRecord): Promise<RunnerOutput>;
10+
run(job: SandboxRunRecord, appendLog?: (chunk: string) => void): Promise<RunnerOutput>;
1111
}
12-

taco/internal/sandbox/e2b.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func (s *e2bSandbox) ExecutePlan(ctx context.Context, req *PlanRequest) (*PlanRe
4848
if req == nil {
4949
return nil, fmt.Errorf("plan request cannot be nil")
5050
}
51-
51+
5252
// Validate engine field is set
5353
if req.Engine == "" {
5454
return nil, fmt.Errorf("engine field is required but was empty")
@@ -76,7 +76,7 @@ func (s *e2bSandbox) ExecutePlan(ctx context.Context, req *PlanRequest) (*PlanRe
7676
return nil, err
7777
}
7878

79-
status, err := s.waitForCompletion(ctx, jobID)
79+
status, err := s.waitForCompletion(ctx, jobID, req.LogSink)
8080
if err != nil {
8181
return nil, err
8282
}
@@ -109,7 +109,7 @@ func (s *e2bSandbox) ExecuteApply(ctx context.Context, req *ApplyRequest) (*Appl
109109
if req == nil {
110110
return nil, fmt.Errorf("apply request cannot be nil")
111111
}
112-
112+
113113
// Validate engine field is set
114114
if req.Engine == "" {
115115
return nil, fmt.Errorf("engine field is required but was empty")
@@ -137,7 +137,7 @@ func (s *e2bSandbox) ExecuteApply(ctx context.Context, req *ApplyRequest) (*Appl
137137
return nil, err
138138
}
139139

140-
status, err := s.waitForCompletion(ctx, jobID)
140+
status, err := s.waitForCompletion(ctx, jobID, req.LogSink)
141141
if err != nil {
142142
return nil, err
143143
}
@@ -167,7 +167,7 @@ func (s *e2bSandbox) startRun(ctx context.Context, payload e2bRunRequest) (strin
167167
// Retry logic for transient failures (network issues, sidecar temporarily unavailable)
168168
maxRetries := 3
169169
var lastErr error
170-
170+
171171
for attempt := 1; attempt <= maxRetries; attempt++ {
172172
if attempt > 1 {
173173
// Exponential backoff: 1s, 2s, 4s
@@ -214,22 +214,30 @@ func (s *e2bSandbox) startRun(ctx context.Context, payload e2bRunRequest) (strin
214214
}
215215
return startResp.ID, nil
216216
}
217-
217+
218218
return "", fmt.Errorf("failed to start sandbox run after %d attempts: %w", maxRetries, lastErr)
219219
}
220220

221-
func (s *e2bSandbox) waitForCompletion(ctx context.Context, runID string) (*e2bRunStatusResponse, error) {
221+
func (s *e2bSandbox) waitForCompletion(ctx context.Context, runID string, onLog func(string)) (*e2bRunStatusResponse, error) {
222222
ctx, cancel := context.WithTimeout(ctx, s.cfg.PollTimeout)
223223
defer cancel()
224224

225225
ticker := time.NewTicker(s.cfg.PollInterval)
226226
defer ticker.Stop()
227227

228228
var lastErr error
229+
lastLen := 0
229230

230231
for {
231232
status, err := s.fetchStatus(ctx, runID)
232233
if err == nil {
234+
// Stream incremental log chunks as we see new bytes
235+
if onLog != nil && len(status.Logs) > lastLen {
236+
chunk := status.Logs[lastLen:]
237+
onLog(chunk)
238+
lastLen = len(status.Logs)
239+
}
240+
233241
switch strings.ToLower(status.Status) {
234242
case "succeeded", "completed", "done":
235243
return status, nil

taco/internal/sandbox/types.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ type PlanRequest struct {
1616
ConfigArchive []byte
1717
State []byte
1818
Metadata map[string]string
19+
// LogSink is an optional callback that receives incremental log chunks
20+
// as they are observed while polling the sandbox run.
21+
LogSink func(chunk string)
1922
}
2023

2124
// PlanResult captures the outcome of a sandboxed plan execution.
@@ -43,6 +46,9 @@ type ApplyRequest struct {
4346
ConfigArchive []byte
4447
State []byte
4548
Metadata map[string]string
49+
// LogSink is an optional callback that receives incremental log chunks
50+
// as they are observed while polling the sandbox run.
51+
LogSink func(chunk string)
4652
}
4753

4854
// ApplyResult captures the outcome of a sandboxed apply.

taco/internal/tfe/apply.go

Lines changed: 19 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -100,49 +100,25 @@ func (h *TfeHandler) GetApplyLogs(c echo.Context) error {
100100
return c.JSON(http.StatusNotFound, map[string]string{"error": "apply not found"})
101101
}
102102

103-
// Try to get apply logs from blob storage
104-
var logText string
105-
applyLogBlobID := fmt.Sprintf("runs/%s/apply-logs.txt", run.ID)
106-
107-
logData, err := h.blobStore.DownloadBlob(ctx, applyLogBlobID)
108-
if err == nil {
109-
logText = string(logData)
110-
} else {
111-
// If logs don't exist yet, return placeholder
112-
if run.Status == "applying" || run.Status == "apply_queued" {
113-
logText = "Waiting for apply to start...\n"
114-
} else {
115-
logText = "Apply logs not available\n"
116-
}
117-
}
118-
119-
// Handle offset for streaming with proper byte accounting
120-
// Stream format: [STX at offset 0][logText at offset 1+][ETX at offset 1+len(logText)]
121-
var responseData []byte
122-
123-
if offsetInt == 0 {
124-
// First request: send STX + current logs
125-
responseData = append([]byte{0x02}, []byte(logText)...)
126-
fmt.Printf("📤 APPLY LOGS at offset=0: STX + %d bytes of log text\n", len(logText))
127-
} else {
128-
// Client already received STX (1 byte at offset 0)
129-
// Map stream offset to logText offset: streamOffset=1 → logText[0]
130-
logOffset := offsetInt - 1
131-
132-
if logOffset < int64(len(logText)) {
133-
// Send remaining log text
134-
responseData = []byte(logText[logOffset:])
135-
fmt.Printf("📤 APPLY LOGS at offset=%d: sending %d bytes (logText[%d:])\n",
136-
offsetInt, len(responseData), logOffset)
137-
} else if logOffset == int64(len(logText)) && run.Status == "applied" {
138-
// All logs sent, send ETX
139-
responseData = []byte{0x03}
140-
fmt.Printf("📤 Sending ETX (End of Text) for apply %s - logs complete\n", applyID)
141-
} else {
142-
// Waiting for more logs or already sent ETX
143-
responseData = []byte{}
144-
fmt.Printf("📤 APPLY LOGS at offset=%d: no new data (waiting or complete)\n", offsetInt)
145-
}
103+
responseData, err := streamChunkedLogs(ctx, h.blobStore, logStreamOptions{
104+
Prefix: "applies",
105+
Label: "APPLY",
106+
ID: run.ID,
107+
Offset: offsetInt,
108+
ChunkSize: 2 * 1024,
109+
GenerateDefaultText: func() string {
110+
if run.Status == "applying" || run.Status == "apply_queued" {
111+
return "Waiting for apply to start...\n"
112+
}
113+
return "Apply logs not available\n"
114+
},
115+
IsComplete: func() bool {
116+
return run.Status == "applied" || run.Status == "errored"
117+
},
118+
AppendETXOnFirst: true, // If already complete on first request, send ETX immediately
119+
})
120+
if err != nil {
121+
return err
146122
}
147123

148124
c.Response().Header().Set("Content-Type", "text/plain")

0 commit comments

Comments
 (0)