Skip to content

Commit 2bb1082

Browse files
juliusmarmingecodex
authored andcommitted
Use lazy stream accessors for provider runtime events (pingdotgg#1746)
Co-authored-by: codex <codex@users.noreply.github.com>
1 parent 0196390 commit 2bb1082

14 files changed

Lines changed: 176 additions & 55 deletions

apps/server/integration/OrchestrationEngineHarness.integration.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ import { CheckpointReactorLive } from "../src/orchestration/Layers/CheckpointRea
4848
import { OrchestrationEngineLive } from "../src/orchestration/Layers/OrchestrationEngine.ts";
4949
import { OrchestrationProjectionPipelineLive } from "../src/orchestration/Layers/ProjectionPipeline.ts";
5050
import { OrchestrationProjectionSnapshotQueryLive } from "../src/orchestration/Layers/ProjectionSnapshotQuery.ts";
51-
import { RuntimeReceiptBusLive } from "../src/orchestration/Layers/RuntimeReceiptBus.ts";
51+
import { RuntimeReceiptBusTest } from "../src/orchestration/Layers/RuntimeReceiptBus.ts";
5252
import { OrchestrationReactorLive } from "../src/orchestration/Layers/OrchestrationReactor.ts";
5353
import { ProviderCommandReactorLive } from "../src/orchestration/Layers/ProviderCommandReactor.ts";
5454
import { ProviderRuntimeIngestionLive } from "../src/orchestration/Layers/ProviderRuntimeIngestion.ts";
@@ -297,7 +297,7 @@ export const makeOrchestrationIntegrationHarness = (
297297
ProjectionPendingApprovalRepositoryLive,
298298
checkpointStoreLayer,
299299
providerLayer,
300-
RuntimeReceiptBusLive,
300+
RuntimeReceiptBusTest,
301301
);
302302
const serverSettingsLayer = ServerSettingsService.layerTest();
303303
const runtimeIngestionLayer = ProviderRuntimeIngestionLive.pipe(
@@ -376,7 +376,7 @@ export const makeOrchestrationIntegrationHarness = (
376376
runtime.runPromise(reactor.start().pipe(Scope.provide(scope))),
377377
).pipe(Effect.orDie);
378378
const receiptHistory = yield* Ref.make<ReadonlyArray<OrchestrationRuntimeReceipt>>([]);
379-
yield* Stream.runForEach(runtimeReceiptBus.stream, (receipt) =>
379+
yield* Stream.runForEach(runtimeReceiptBus.streamEventsForTest, (receipt) =>
380380
Ref.update(receiptHistory, (history) => [...history, receipt]).pipe(Effect.asVoid),
381381
).pipe(Effect.forkIn(scope));
382382
yield* Effect.sleep(10);

apps/server/integration/providerService.integration.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ const collectEventsDuring = <A, E, R>(
8686
Effect.forkScoped,
8787
);
8888

89+
yield* Effect.sleep("50 millis");
8990
yield* action;
9091

9192
return yield* Effect.forEach(
@@ -104,7 +105,6 @@ const runTurn = (input: {
104105
}) =>
105106
Effect.gen(function* () {
106107
yield* input.harness.queueTurnResponse(input.threadId, input.response);
107-
108108
return yield* collectEventsDuring(
109109
input.provider.streamEvents,
110110
input.response.events.length,
@@ -116,7 +116,7 @@ const runTurn = (input: {
116116
);
117117
});
118118

119-
it.effect("replays typed runtime fixture events", () =>
119+
it.live("replays typed runtime fixture events", () =>
120120
Effect.gen(function* () {
121121
const fixture = yield* makeIntegrationFixture;
122122

@@ -149,7 +149,7 @@ it.effect("replays typed runtime fixture events", () =>
149149
}).pipe(Effect.provide(NodeServices.layer)),
150150
);
151151

152-
it.effect("replays file-changing fixture turn events", () =>
152+
it.live("replays file-changing fixture turn events", () =>
153153
Effect.gen(function* () {
154154
const fixture = yield* makeIntegrationFixture;
155155
const { join } = yield* Path.Path;
@@ -188,7 +188,7 @@ it.effect("replays file-changing fixture turn events", () =>
188188
}).pipe(Effect.provide(NodeServices.layer)),
189189
);
190190

191-
it.effect("runs multi-turn tool/approval flow", () =>
191+
it.live("runs multi-turn tool/approval flow", () =>
192192
Effect.gen(function* () {
193193
const fixture = yield* makeIntegrationFixture;
194194
const { join } = yield* Path.Path;
@@ -242,7 +242,7 @@ it.effect("runs multi-turn tool/approval flow", () =>
242242
}).pipe(Effect.provide(NodeServices.layer)),
243243
);
244244

245-
it.effect("rolls back provider conversation state only", () =>
245+
it.live("rolls back provider conversation state only", () =>
246246
Effect.gen(function* () {
247247
const fixture = yield* makeIntegrationFixture;
248248
const { join } = yield* Path.Path;

apps/server/src/orchestration/Layers/CheckpointReactor.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,9 @@ function createProviderServiceHarness(
9696
listSessions,
9797
getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" } as any),
9898
rollbackConversation,
99-
streamEvents: Stream.fromPubSub(runtimeEventPubSub),
99+
get streamEvents() {
100+
return Stream.fromPubSub(runtimeEventPubSub);
101+
},
100102
};
101103

102104
const emit = (event: LegacyProviderRuntimeEvent): void => {

apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,9 @@ describe("ProviderCommandReactor", () => {
207207
persistentRuntime: true,
208208
}),
209209
rollbackConversation: () => unsupported(),
210-
streamEvents: Stream.fromPubSub(runtimeEventPubSub),
210+
get streamEvents() {
211+
return Stream.fromPubSub(runtimeEventPubSub);
212+
},
211213
};
212214

213215
const orchestrationLayer = OrchestrationEngineLive.pipe(

apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,9 @@ function createProviderServiceHarness() {
9999
listSessions: () => Effect.succeed([...runtimeSessions]),
100100
getCapabilities: () => Effect.succeed(getProviderCapabilities("codex")),
101101
rollbackConversation: () => unsupported(),
102-
streamEvents: Stream.fromPubSub(runtimeEventPubSub),
102+
get streamEvents() {
103+
return Stream.fromPubSub(runtimeEventPubSub);
104+
},
103105
};
104106

105107
const setSession = (session: ProviderSession): void => {
Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
/**
2+
* RuntimeReceiptBus layers.
3+
*
4+
* `RuntimeReceiptBusLive` is the production default and intentionally does not
5+
* retain or broadcast receipts. `RuntimeReceiptBusTest` installs the in-memory
6+
* PubSub-backed implementation used by integration tests that need to await
7+
* checkpoint-reactor milestones precisely.
8+
*
9+
* @module RuntimeReceiptBus
10+
*/
111
import { Effect, Layer, PubSub, Stream } from "effect";
212

313
import {
@@ -6,13 +16,21 @@ import {
616
type OrchestrationRuntimeReceipt,
717
} from "../Services/RuntimeReceiptBus.ts";
818

9-
const makeRuntimeReceiptBus = Effect.gen(function* () {
19+
const makeRuntimeReceiptBus = Effect.succeed({
20+
publish: () => Effect.void,
21+
streamEventsForTest: Stream.empty,
22+
} satisfies RuntimeReceiptBusShape);
23+
24+
const makeRuntimeReceiptBusTest = Effect.gen(function* () {
1025
const pubSub = yield* PubSub.unbounded<OrchestrationRuntimeReceipt>();
1126

1227
return {
1328
publish: (receipt) => PubSub.publish(pubSub, receipt).pipe(Effect.asVoid),
14-
stream: Stream.fromPubSub(pubSub),
29+
get streamEventsForTest() {
30+
return Stream.fromPubSub(pubSub);
31+
},
1532
} satisfies RuntimeReceiptBusShape;
1633
});
1734

1835
export const RuntimeReceiptBusLive = Layer.effect(RuntimeReceiptBus, makeRuntimeReceiptBus);
36+
export const RuntimeReceiptBusTest = Layer.effect(RuntimeReceiptBus, makeRuntimeReceiptBusTest);

apps/server/src/orchestration/Services/RuntimeReceiptBus.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/**
2+
* RuntimeReceiptBus - Internal checkpoint-reactor synchronization receipts.
3+
*
4+
* This service exists to expose short-lived orchestration milestones that are
5+
* useful in tests and harnesses but are not part of the production runtime
6+
* event model. `CheckpointReactor` publishes receipts such as baseline capture,
7+
* diff finalization, and turn-processing quiescence so integration tests can
8+
* wait for those exact points without inferring them indirectly from persisted
9+
* state.
10+
*
11+
* Production code should only call `publish`. Test code may subscribe via
12+
* `streamEventsForTest`, which is intentionally named to make the intended
13+
* usage explicit.
14+
*
15+
* @module RuntimeReceiptBus
16+
*/
117
import { CheckpointRef, IsoDateTime, NonNegativeInt, ThreadId, TurnId } from "@t3tools/contracts";
218
import { Schema, ServiceMap } from "effect";
319
import type { Effect, Stream } from "effect";
@@ -40,7 +56,7 @@ export type OrchestrationRuntimeReceipt = typeof OrchestrationRuntimeReceipt.Typ
4056

4157
export interface RuntimeReceiptBusShape {
4258
readonly publish: (receipt: OrchestrationRuntimeReceipt) => Effect.Effect<void>;
43-
readonly stream: Stream.Stream<OrchestrationRuntimeReceipt>;
59+
readonly streamEventsForTest: Stream.Stream<OrchestrationRuntimeReceipt>;
4460
}
4561

4662
export class RuntimeReceiptBus extends ServiceMap.Service<

apps/server/src/provider/Layers/ClaudeAdapter.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3130,7 +3130,9 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) {
31303130
listSessions,
31313131
hasSession,
31323132
stopAll,
3133-
streamEvents: Stream.fromQueue(runtimeEventQueue),
3133+
get streamEvents() {
3134+
return Stream.fromQueue(runtimeEventQueue);
3135+
},
31343136
} satisfies ClaudeAdapterShape;
31353137
});
31363138
}

apps/server/src/provider/Layers/CodexAdapter.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1640,7 +1640,9 @@ const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* (
16401640
listSessions,
16411641
hasSession,
16421642
stopAll,
1643-
streamEvents: Stream.fromQueue(runtimeEventQueue),
1643+
get streamEvents() {
1644+
return Stream.fromQueue(runtimeEventQueue);
1645+
},
16441646
} satisfies CodexAdapterShape;
16451647
});
16461648

apps/server/src/provider/Layers/EventNdjsonLogger.test.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,49 @@ describe("EventNdjsonLogger", () => {
112112
}),
113113
);
114114

115+
it.effect("serializes concurrent first writes for the same segment", () =>
116+
Effect.gen(function* () {
117+
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "t3-provider-log-"));
118+
const basePath = path.join(tempDir, "provider-canonical.ndjson");
119+
120+
try {
121+
const logger = yield* makeEventNdjsonLogger(basePath, {
122+
stream: "canonical",
123+
batchWindowMs: 0,
124+
});
125+
assert.notEqual(logger, undefined);
126+
if (!logger) {
127+
return;
128+
}
129+
130+
yield* Effect.all(
131+
[
132+
logger.write({ id: "evt-concurrent-1" }, null),
133+
logger.write({ id: "evt-concurrent-2" }, null),
134+
],
135+
{ concurrency: "unbounded" },
136+
);
137+
yield* logger.close();
138+
139+
const globalPath = path.join(tempDir, "_global.log");
140+
assert.equal(fs.existsSync(globalPath), true);
141+
const lines = fs
142+
.readFileSync(globalPath, "utf8")
143+
.trim()
144+
.split("\n")
145+
.map((line) => parseLogLine(line));
146+
147+
assert.equal(lines.length, 2);
148+
assert.deepEqual(lines.map((line) => line.payload).toSorted(), [
149+
'{"id":"evt-concurrent-1"}',
150+
'{"id":"evt-concurrent-2"}',
151+
]);
152+
} finally {
153+
fs.rmSync(tempDir, { recursive: true, force: true });
154+
}
155+
}),
156+
);
157+
115158
it.effect("rotates per-thread files when max size is exceeded", () =>
116159
Effect.gen(function* () {
117160
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "t3-provider-log-"));

0 commit comments

Comments
 (0)